/src/crow/include/crow/http_server.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #ifdef CROW_USE_BOOST |
4 | | #include <boost/asio.hpp> |
5 | | #ifdef CROW_ENABLE_SSL |
6 | | #include <boost/asio/ssl.hpp> |
7 | | #endif |
8 | | #else |
9 | | #ifndef ASIO_STANDALONE |
10 | | #define ASIO_STANDALONE |
11 | | #endif |
12 | | #include <asio.hpp> |
13 | | #ifdef CROW_ENABLE_SSL |
14 | | #include <asio/ssl.hpp> |
15 | | #endif |
16 | | #endif |
17 | | |
18 | | #include <atomic> |
19 | | #include <chrono> |
20 | | #include <cstdint> |
21 | | #include <future> |
22 | | #include <memory> |
23 | | #include <vector> |
24 | | |
25 | | #include "crow/version.h" |
26 | | #include "crow/http_connection.h" |
27 | | #include "crow/logging.h" |
28 | | #include "crow/task_timer.h" |
29 | | |
30 | | |
31 | | namespace crow // NOTE: Already documented in "crow/app.h" |
32 | | { |
33 | | #ifdef CROW_USE_BOOST |
34 | | namespace asio = boost::asio; |
35 | | using error_code = boost::system::error_code; |
36 | | #else |
37 | | using error_code = asio::error_code; |
38 | | #endif |
39 | | using tcp = asio::ip::tcp; |
40 | | |
41 | | template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares> |
42 | | class Server |
43 | | { |
44 | | public: |
45 | | Server(Handler* handler, |
46 | | const tcp::endpoint& endpoint, |
47 | | std::string server_name = std::string("Crow/") + VERSION, |
48 | | std::tuple<Middlewares...>* middlewares = nullptr, |
49 | | uint16_t concurrency = 1, |
50 | | uint8_t timeout = 5, |
51 | | typename Adaptor::context* adaptor_ctx = nullptr): |
52 | 0 | acceptor_(io_context_,endpoint), |
53 | 0 | signals_(io_context_), |
54 | 0 | tick_timer_(io_context_), |
55 | 0 | handler_(handler), |
56 | 0 | concurrency_(concurrency), |
57 | 0 | timeout_(timeout), |
58 | 0 | server_name_(server_name), |
59 | 0 | task_queue_length_pool_(concurrency_ - 1), |
60 | 0 | middlewares_(middlewares), |
61 | 0 | adaptor_ctx_(adaptor_ctx) |
62 | 0 | {} |
63 | | |
64 | | void set_tick_function(std::chrono::milliseconds d, std::function<void()> f) |
65 | 0 | { |
66 | 0 | tick_interval_ = d; |
67 | 0 | tick_function_ = f; |
68 | 0 | } |
69 | | |
70 | | void on_tick() |
71 | 0 | { |
72 | 0 | tick_function_(); |
73 | 0 | tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count())); |
74 | 0 | tick_timer_.async_wait([this](const error_code& ec) { |
75 | 0 | if (ec) |
76 | 0 | return; |
77 | 0 | on_tick(); |
78 | 0 | }); |
79 | 0 | } |
80 | | |
81 | | void run() |
82 | 0 | { |
83 | 0 | uint16_t worker_thread_count = concurrency_ - 1; |
84 | 0 | for (int i = 0; i < worker_thread_count; i++) |
85 | 0 | io_context_pool_.emplace_back(new asio::io_context()); |
86 | 0 | get_cached_date_str_pool_.resize(worker_thread_count); |
87 | 0 | task_timer_pool_.resize(worker_thread_count); |
88 | |
|
89 | 0 | std::vector<std::future<void>> v; |
90 | 0 | std::atomic<int> init_count(0); |
91 | 0 | for (uint16_t i = 0; i < worker_thread_count; i++) |
92 | 0 | v.push_back( |
93 | 0 | std::async( |
94 | 0 | std::launch::async, [this, i, &init_count] { |
95 | | // thread local date string get function |
96 | 0 | auto last = std::chrono::steady_clock::now(); |
97 | |
|
98 | 0 | std::string date_str; |
99 | 0 | auto update_date_str = [&] { |
100 | 0 | auto last_time_t = time(0); |
101 | 0 | tm my_tm; |
102 | |
|
103 | | #if defined(_MSC_VER) || defined(__MINGW32__) |
104 | | gmtime_s(&my_tm, &last_time_t); |
105 | | #else |
106 | 0 | gmtime_r(&last_time_t, &my_tm); |
107 | 0 | #endif |
108 | 0 | date_str.resize(100); |
109 | 0 | size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm); |
110 | 0 | date_str.resize(date_str_sz); |
111 | 0 | }; |
112 | 0 | update_date_str(); |
113 | 0 | get_cached_date_str_pool_[i] = [&]() -> std::string { |
114 | 0 | if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1)) |
115 | 0 | { |
116 | 0 | last = std::chrono::steady_clock::now(); |
117 | 0 | update_date_str(); |
118 | 0 | } |
119 | 0 | return date_str; |
120 | 0 | }; |
121 | | |
122 | | // initializing task timers |
123 | 0 | detail::task_timer task_timer(*io_context_pool_[i]); |
124 | 0 | task_timer.set_default_timeout(timeout_); |
125 | 0 | task_timer_pool_[i] = &task_timer; |
126 | 0 | task_queue_length_pool_[i] = 0; |
127 | |
|
128 | 0 | init_count++; |
129 | 0 | while (1) |
130 | 0 | { |
131 | 0 | try |
132 | 0 | { |
133 | 0 | if (io_context_pool_[i]->run() == 0) |
134 | 0 | { |
135 | | // when io_service.run returns 0, there are no more works to do. |
136 | 0 | break; |
137 | 0 | } |
138 | 0 | } |
139 | 0 | catch (std::exception& e) |
140 | 0 | { |
141 | 0 | CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what(); |
142 | 0 | } |
143 | 0 | } |
144 | 0 | })); |
145 | |
|
146 | 0 | if (tick_function_ && tick_interval_.count() > 0) |
147 | 0 | { |
148 | 0 | tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count())); |
149 | 0 | tick_timer_.async_wait( |
150 | 0 | [this](const error_code& ec) { |
151 | 0 | if (ec) |
152 | 0 | return; |
153 | 0 | on_tick(); |
154 | 0 | }); |
155 | 0 | } |
156 | |
|
157 | 0 | handler_->port(acceptor_.local_endpoint().port()); |
158 | | |
159 | |
|
160 | 0 | CROW_LOG_INFO << server_name_ |
161 | 0 | << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") |
162 | 0 | << acceptor_.local_endpoint().address() << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads"; |
163 | 0 | CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs."; |
164 | |
|
165 | 0 | signals_.async_wait( |
166 | 0 | [&](const error_code& /*error*/, int /*signal_number*/) { |
167 | 0 | stop(); |
168 | 0 | }); |
169 | |
|
170 | 0 | while (worker_thread_count != init_count) |
171 | 0 | std::this_thread::yield(); |
172 | |
|
173 | 0 | do_accept(); |
174 | |
|
175 | 0 | std::thread( |
176 | 0 | [this] { |
177 | 0 | notify_start(); |
178 | 0 | io_context_.run(); |
179 | 0 | CROW_LOG_INFO << "Exiting."; |
180 | 0 | }) |
181 | 0 | .join(); |
182 | 0 | } |
183 | | |
184 | | void stop() |
185 | 0 | { |
186 | 0 | shutting_down_ = true; // Prevent the acceptor from taking new connections |
187 | 0 | for (auto& io_context : io_context_pool_) |
188 | 0 | { |
189 | 0 | if (io_context != nullptr) |
190 | 0 | { |
191 | 0 | CROW_LOG_INFO << "Closing IO service " << &io_context; |
192 | 0 | io_context->stop(); // Close all io_services (and HTTP connections) |
193 | 0 | } |
194 | 0 | } |
195 | |
|
196 | 0 | CROW_LOG_INFO << "Closing main IO service (" << &io_context_ << ')'; |
197 | 0 | io_context_.stop(); // Close main io_service |
198 | 0 | } |
199 | | |
200 | | uint16_t port() const { |
201 | | return acceptor_.local_endpoint().port(); |
202 | | } |
203 | | |
204 | | /// Wait until the server has properly started or until timeout |
205 | | std::cv_status wait_for_start(std::chrono::steady_clock::time_point wait_until) |
206 | | { |
207 | | std::unique_lock<std::mutex> lock(start_mutex_); |
208 | | |
209 | | std::cv_status status = std::cv_status::no_timeout; |
210 | | while (!server_started_ && ( status==std::cv_status::no_timeout )) |
211 | | status = cv_started_.wait_until(lock,wait_until); |
212 | | return status; |
213 | | } |
214 | | |
215 | | void signal_clear() |
216 | | { |
217 | | signals_.clear(); |
218 | | } |
219 | | |
220 | | void signal_add(int signal_number) |
221 | 0 | { |
222 | 0 | signals_.add(signal_number); |
223 | 0 | } |
224 | | |
225 | | private: |
226 | | uint16_t pick_io_context_idx() |
227 | 0 | { |
228 | 0 | uint16_t min_queue_idx = 0; |
229 | | |
230 | | // TODO improve load balancing |
231 | | // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/ |
232 | | // even though the max value of this can be only uint16_t as concurrency is uint16_t. |
233 | 0 | for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++) |
234 | | // No need to check other io_services if the current one has no tasks |
235 | 0 | { |
236 | 0 | if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx]) |
237 | 0 | min_queue_idx = i; |
238 | 0 | } |
239 | 0 | return min_queue_idx; |
240 | 0 | } |
241 | | |
242 | | void do_accept() |
243 | 0 | { |
244 | 0 | if (!shutting_down_) |
245 | 0 | { |
246 | 0 | uint16_t context_idx = pick_io_context_idx(); |
247 | 0 | asio::io_context& ic = *io_context_pool_[context_idx]; |
248 | 0 | task_queue_length_pool_[context_idx]++; |
249 | 0 | CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx]; |
250 | |
|
251 | 0 | auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>( |
252 | 0 | ic, handler_, server_name_, middlewares_, |
253 | 0 | get_cached_date_str_pool_[context_idx], *task_timer_pool_[context_idx], adaptor_ctx_, task_queue_length_pool_[context_idx]); |
254 | |
|
255 | 0 | acceptor_.async_accept( |
256 | 0 | p->socket(), |
257 | 0 | [this, p, &ic, context_idx](error_code ec) { |
258 | 0 | if (!ec) |
259 | 0 | { |
260 | 0 | asio::post(ic, |
261 | 0 | [p] { |
262 | 0 | p->start(); |
263 | 0 | }); |
264 | 0 | } |
265 | 0 | else |
266 | 0 | { |
267 | 0 | task_queue_length_pool_[context_idx]--; |
268 | 0 | CROW_LOG_DEBUG << &ic << " {" << context_idx << "} queue length: " << task_queue_length_pool_[context_idx]; |
269 | 0 | } |
270 | 0 | do_accept(); |
271 | 0 | }); |
272 | 0 | } |
273 | 0 | } |
274 | | |
275 | | /// Notify anything using `wait_for_start()` to proceed |
276 | | void notify_start() |
277 | 0 | { |
278 | 0 | std::unique_lock<std::mutex> lock(start_mutex_); |
279 | 0 | server_started_ = true; |
280 | 0 | cv_started_.notify_all(); |
281 | 0 | } |
282 | | |
283 | | private: |
284 | | std::vector<std::unique_ptr<asio::io_context>> io_context_pool_; |
285 | | asio::io_context io_context_; |
286 | | std::vector<detail::task_timer*> task_timer_pool_; |
287 | | std::vector<std::function<std::string()>> get_cached_date_str_pool_; |
288 | | tcp::acceptor acceptor_; |
289 | | bool shutting_down_ = false; |
290 | | bool server_started_{false}; |
291 | | std::condition_variable cv_started_; |
292 | | std::mutex start_mutex_; |
293 | | asio::signal_set signals_; |
294 | | |
295 | | asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_; |
296 | | |
297 | | Handler* handler_; |
298 | | uint16_t concurrency_{2}; |
299 | | std::uint8_t timeout_; |
300 | | std::string server_name_; |
301 | | std::vector<std::atomic<unsigned int>> task_queue_length_pool_; |
302 | | |
303 | | std::chrono::milliseconds tick_interval_; |
304 | | std::function<void()> tick_function_; |
305 | | |
306 | | std::tuple<Middlewares...>* middlewares_; |
307 | | |
308 | | typename Adaptor::context* adaptor_ctx_; |
309 | | }; |
310 | | } // namespace crow |