/src/crow/include/crow/http_server.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #ifdef CROW_USE_BOOST |
4 | | #include <boost/asio.hpp> |
5 | | #ifdef CROW_ENABLE_SSL |
6 | | #include <boost/asio/ssl.hpp> |
7 | | #endif |
8 | | #else |
9 | | #ifndef ASIO_STANDALONE |
10 | | #define ASIO_STANDALONE |
11 | | #endif |
12 | | #include <asio.hpp> |
13 | | #ifdef CROW_ENABLE_SSL |
14 | | #include <asio/ssl.hpp> |
15 | | #endif |
16 | | #endif |
17 | | |
18 | | #include <atomic> |
19 | | #include <chrono> |
20 | | #include <cstdint> |
21 | | #include <future> |
22 | | #include <memory> |
23 | | #include <vector> |
24 | | |
25 | | #include "crow/version.h" |
26 | | #include "crow/http_connection.h" |
27 | | #include "crow/logging.h" |
28 | | #include "crow/task_timer.h" |
29 | | |
30 | | |
31 | | namespace crow // NOTE: Already documented in "crow/app.h" |
32 | | { |
33 | | #ifdef CROW_USE_BOOST |
34 | | namespace asio = boost::asio; |
35 | | using error_code = boost::system::error_code; |
36 | | #else |
37 | | using error_code = asio::error_code; |
38 | | #endif |
39 | | using tcp = asio::ip::tcp; |
40 | | |
41 | | template<typename Handler, typename Adaptor = SocketAdaptor, typename... Middlewares> |
42 | | class Server |
43 | | { |
44 | | public: |
45 | | Server(Handler* handler, std::string bindaddr, uint16_t port, std::string server_name = std::string("Crow/") + VERSION, std::tuple<Middlewares...>* middlewares = nullptr, uint16_t concurrency = 1, uint8_t timeout = 5, typename Adaptor::context* adaptor_ctx = nullptr): |
46 | | acceptor_(io_service_, tcp::endpoint(asio::ip::address::from_string(bindaddr), port)), |
47 | | signals_(io_service_), |
48 | | tick_timer_(io_service_), |
49 | | handler_(handler), |
50 | | concurrency_(concurrency), |
51 | | timeout_(timeout), |
52 | | server_name_(server_name), |
53 | | port_(port), |
54 | | bindaddr_(bindaddr), |
55 | | task_queue_length_pool_(concurrency_ - 1), |
56 | | middlewares_(middlewares), |
57 | | adaptor_ctx_(adaptor_ctx) |
58 | 0 | {} |
59 | | |
60 | | void set_tick_function(std::chrono::milliseconds d, std::function<void()> f) |
61 | 0 | { |
62 | 0 | tick_interval_ = d; |
63 | 0 | tick_function_ = f; |
64 | 0 | } |
65 | | |
66 | | void on_tick() |
67 | 0 | { |
68 | 0 | tick_function_(); |
69 | 0 | tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count())); |
70 | 0 | tick_timer_.async_wait([this](const error_code& ec) { |
71 | 0 | if (ec) |
72 | 0 | return; |
73 | 0 | on_tick(); |
74 | 0 | }); |
75 | 0 | } |
76 | | |
77 | | void run() |
78 | 0 | { |
79 | 0 | uint16_t worker_thread_count = concurrency_ - 1; |
80 | 0 | for (int i = 0; i < worker_thread_count; i++) |
81 | 0 | io_service_pool_.emplace_back(new asio::io_service()); |
82 | 0 | get_cached_date_str_pool_.resize(worker_thread_count); |
83 | 0 | task_timer_pool_.resize(worker_thread_count); |
84 | |
|
85 | 0 | std::vector<std::future<void>> v; |
86 | 0 | std::atomic<int> init_count(0); |
87 | 0 | for (uint16_t i = 0; i < worker_thread_count; i++) |
88 | 0 | v.push_back( |
89 | 0 | std::async( |
90 | 0 | std::launch::async, [this, i, &init_count] { |
91 | | // thread local date string get function |
92 | 0 | auto last = std::chrono::steady_clock::now(); |
93 | |
|
94 | 0 | std::string date_str; |
95 | 0 | auto update_date_str = [&] { |
96 | 0 | auto last_time_t = time(0); |
97 | 0 | tm my_tm; |
98 | |
|
99 | | #if defined(_MSC_VER) || defined(__MINGW32__) |
100 | | gmtime_s(&my_tm, &last_time_t); |
101 | | #else |
102 | 0 | gmtime_r(&last_time_t, &my_tm); |
103 | 0 | #endif |
104 | 0 | date_str.resize(100); |
105 | 0 | size_t date_str_sz = strftime(&date_str[0], 99, "%a, %d %b %Y %H:%M:%S GMT", &my_tm); |
106 | 0 | date_str.resize(date_str_sz); |
107 | 0 | }; |
108 | 0 | update_date_str(); |
109 | 0 | get_cached_date_str_pool_[i] = [&]() -> std::string { |
110 | 0 | if (std::chrono::steady_clock::now() - last >= std::chrono::seconds(1)) |
111 | 0 | { |
112 | 0 | last = std::chrono::steady_clock::now(); |
113 | 0 | update_date_str(); |
114 | 0 | } |
115 | 0 | return date_str; |
116 | 0 | }; |
117 | | |
118 | | // initializing task timers |
119 | 0 | detail::task_timer task_timer(*io_service_pool_[i]); |
120 | 0 | task_timer.set_default_timeout(timeout_); |
121 | 0 | task_timer_pool_[i] = &task_timer; |
122 | 0 | task_queue_length_pool_[i] = 0; |
123 | |
|
124 | 0 | init_count++; |
125 | 0 | while (1) |
126 | 0 | { |
127 | 0 | try |
128 | 0 | { |
129 | 0 | if (io_service_pool_[i]->run() == 0) |
130 | 0 | { |
131 | | // when io_service.run returns 0, there are no more works to do. |
132 | 0 | break; |
133 | 0 | } |
134 | 0 | } |
135 | 0 | catch (std::exception& e) |
136 | 0 | { |
137 | 0 | CROW_LOG_ERROR << "Worker Crash: An uncaught exception occurred: " << e.what(); |
138 | 0 | } |
139 | 0 | } |
140 | 0 | })); |
141 | |
|
142 | 0 | if (tick_function_ && tick_interval_.count() > 0) |
143 | 0 | { |
144 | 0 | tick_timer_.expires_after(std::chrono::milliseconds(tick_interval_.count())); |
145 | 0 | tick_timer_.async_wait( |
146 | 0 | [this](const error_code& ec) { |
147 | 0 | if (ec) |
148 | 0 | return; |
149 | 0 | on_tick(); |
150 | 0 | }); |
151 | 0 | } |
152 | |
|
153 | 0 | port_ = acceptor_.local_endpoint().port(); |
154 | 0 | handler_->port(port_); |
155 | | |
156 | |
|
157 | 0 | CROW_LOG_INFO << server_name_ << " server is running at " << (handler_->ssl_used() ? "https://" : "http://") << bindaddr_ << ":" << acceptor_.local_endpoint().port() << " using " << concurrency_ << " threads"; |
158 | 0 | CROW_LOG_INFO << "Call `app.loglevel(crow::LogLevel::Warning)` to hide Info level logs."; |
159 | |
|
160 | 0 | signals_.async_wait( |
161 | 0 | [&](const error_code& /*error*/, int /*signal_number*/) { |
162 | 0 | stop(); |
163 | 0 | }); |
164 | |
|
165 | 0 | while (worker_thread_count != init_count) |
166 | 0 | std::this_thread::yield(); |
167 | |
|
168 | 0 | do_accept(); |
169 | |
|
170 | 0 | std::thread( |
171 | 0 | [this] { |
172 | 0 | notify_start(); |
173 | 0 | io_service_.run(); |
174 | 0 | CROW_LOG_INFO << "Exiting."; |
175 | 0 | }) |
176 | 0 | .join(); |
177 | 0 | } |
178 | | |
179 | | void stop() |
180 | 0 | { |
181 | 0 | shutting_down_ = true; // Prevent the acceptor from taking new connections |
182 | 0 | for (auto& io_service : io_service_pool_) |
183 | 0 | { |
184 | 0 | if (io_service != nullptr) |
185 | 0 | { |
186 | 0 | CROW_LOG_INFO << "Closing IO service " << &io_service; |
187 | 0 | io_service->stop(); // Close all io_services (and HTTP connections) |
188 | 0 | } |
189 | 0 | } |
190 | |
|
191 | 0 | CROW_LOG_INFO << "Closing main IO service (" << &io_service_ << ')'; |
192 | 0 | io_service_.stop(); // Close main io_service |
193 | 0 | } |
194 | | |
195 | | /// Wait until the server has properly started |
196 | | void wait_for_start() |
197 | | { |
198 | | std::unique_lock<std::mutex> lock(start_mutex_); |
199 | | while (!server_started_) |
200 | | cv_started_.wait(lock); |
201 | | } |
202 | | |
203 | | void signal_clear() |
204 | | { |
205 | | signals_.clear(); |
206 | | } |
207 | | |
208 | | void signal_add(int signal_number) |
209 | 0 | { |
210 | 0 | signals_.add(signal_number); |
211 | 0 | } |
212 | | |
213 | | private: |
214 | | uint16_t pick_io_service_idx() |
215 | 0 | { |
216 | 0 | uint16_t min_queue_idx = 0; |
217 | | |
218 | | // TODO improve load balancing |
219 | | // size_t is used here to avoid the security issue https://codeql.github.com/codeql-query-help/cpp/cpp-comparison-with-wider-type/ |
220 | | // even though the max value of this can be only uint16_t as concurrency is uint16_t. |
221 | 0 | for (size_t i = 1; i < task_queue_length_pool_.size() && task_queue_length_pool_[min_queue_idx] > 0; i++) |
222 | | // No need to check other io_services if the current one has no tasks |
223 | 0 | { |
224 | 0 | if (task_queue_length_pool_[i] < task_queue_length_pool_[min_queue_idx]) |
225 | 0 | min_queue_idx = i; |
226 | 0 | } |
227 | 0 | return min_queue_idx; |
228 | 0 | } |
229 | | |
230 | | void do_accept() |
231 | 0 | { |
232 | 0 | if (!shutting_down_) |
233 | 0 | { |
234 | 0 | uint16_t service_idx = pick_io_service_idx(); |
235 | 0 | asio::io_service& is = *io_service_pool_[service_idx]; |
236 | 0 | task_queue_length_pool_[service_idx]++; |
237 | 0 | CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx]; |
238 | |
|
239 | 0 | auto p = std::make_shared<Connection<Adaptor, Handler, Middlewares...>>( |
240 | 0 | is, handler_, server_name_, middlewares_, |
241 | 0 | get_cached_date_str_pool_[service_idx], *task_timer_pool_[service_idx], adaptor_ctx_, task_queue_length_pool_[service_idx]); |
242 | |
|
243 | 0 | acceptor_.async_accept( |
244 | 0 | p->socket(), |
245 | 0 | [this, p, &is, service_idx](error_code ec) { |
246 | 0 | if (!ec) |
247 | 0 | { |
248 | 0 | is.post( |
249 | 0 | [p] { |
250 | 0 | p->start(); |
251 | 0 | }); |
252 | 0 | } |
253 | 0 | else |
254 | 0 | { |
255 | 0 | task_queue_length_pool_[service_idx]--; |
256 | 0 | CROW_LOG_DEBUG << &is << " {" << service_idx << "} queue length: " << task_queue_length_pool_[service_idx]; |
257 | 0 | } |
258 | 0 | do_accept(); |
259 | 0 | }); |
260 | 0 | } |
261 | 0 | } |
262 | | |
263 | | /// Notify anything using `wait_for_start()` to proceed |
264 | | void notify_start() |
265 | 0 | { |
266 | 0 | std::unique_lock<std::mutex> lock(start_mutex_); |
267 | 0 | server_started_ = true; |
268 | 0 | cv_started_.notify_all(); |
269 | 0 | } |
270 | | |
271 | | private: |
272 | | std::vector<std::unique_ptr<asio::io_service>> io_service_pool_; |
273 | | asio::io_service io_service_; |
274 | | std::vector<detail::task_timer*> task_timer_pool_; |
275 | | std::vector<std::function<std::string()>> get_cached_date_str_pool_; |
276 | | tcp::acceptor acceptor_; |
277 | | bool shutting_down_ = false; |
278 | | bool server_started_{false}; |
279 | | std::condition_variable cv_started_; |
280 | | std::mutex start_mutex_; |
281 | | asio::signal_set signals_; |
282 | | |
283 | | asio::basic_waitable_timer<std::chrono::high_resolution_clock> tick_timer_; |
284 | | |
285 | | Handler* handler_; |
286 | | uint16_t concurrency_{2}; |
287 | | std::uint8_t timeout_; |
288 | | std::string server_name_; |
289 | | uint16_t port_; |
290 | | std::string bindaddr_; |
291 | | std::vector<std::atomic<unsigned int>> task_queue_length_pool_; |
292 | | |
293 | | std::chrono::milliseconds tick_interval_; |
294 | | std::function<void()> tick_function_; |
295 | | |
296 | | std::tuple<Middlewares...>* middlewares_; |
297 | | |
298 | | typename Adaptor::context* adaptor_ctx_; |
299 | | }; |
300 | | } // namespace crow |