Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_WORKERS_HPP
11 : #define BOOST_BEAST2_SERVER_WORKERS_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/log_service.hpp>
15 : #include <boost/beast2/server/fixed_array.hpp>
16 : #include <boost/beast2/server/router_corosio.hpp>
17 : #include <boost/beast2/server/http_stream.hpp>
18 : #include <boost/capy/application.hpp>
19 : #include <boost/capy/task.hpp>
20 : #include <boost/capy/ex/async_run.hpp>
21 : #include <boost/corosio/acceptor.hpp>
22 : #include <boost/corosio/socket.hpp>
23 : #include <boost/corosio/io_context.hpp>
24 : #include <boost/corosio/endpoint.hpp>
25 : #include <boost/http/server/route_handler.hpp>
26 :
27 : #include <vector>
28 :
29 : namespace boost {
30 : namespace beast2 {
31 :
32 : /** A preallocated worker that handles one HTTP connection at a time.
33 : */
34 : struct http_worker
35 : {
36 : corosio::socket sock;
37 : bool in_use = false;
38 :
39 0 : explicit http_worker(corosio::io_context& ioc)
40 0 : : sock(ioc)
41 : {
42 0 : }
43 :
44 0 : http_worker(http_worker&&) = default;
45 : http_worker& operator=(http_worker&&) = default;
46 : };
47 :
48 : //------------------------------------------------
49 :
50 : /** A set of accepting sockets and their workers.
51 :
52 : This implements a server that accepts incoming connections
53 : and handles HTTP requests using preallocated workers.
54 : */
55 : class workers
56 : {
57 : public:
58 0 : ~workers() = default;
59 :
60 : /** Constructor
61 :
62 : @param app The application which holds this server
63 : @param ioc The I/O context for async operations
64 : @param num_workers The number of workers to preallocate
65 : @param routes The router for dispatching requests
66 : */
67 : workers(
68 : capy::application& app,
69 : corosio::io_context& ioc,
70 : std::size_t num_workers,
71 : router_corosio routes);
72 :
73 : /** Add a listening endpoint
74 :
75 : @param config Acceptor configuration
76 : @param ep The endpoint to listen on
77 : */
78 : void
79 : listen(
80 : http::acceptor_config config,
81 : corosio::endpoint ep);
82 :
83 : /** Start the accept loop
84 :
85 : Must be called after listen() to begin accepting connections.
86 : */
87 : void start();
88 :
89 : /** Stop accepting and cancel all connections
90 : */
91 : void stop();
92 :
93 : private:
94 : capy::task<void>
95 : accept_loop(corosio::acceptor& acc, http::acceptor_config config);
96 :
97 : capy::task<void>
98 : run_session(http_worker& w, http::acceptor_config const& config);
99 :
100 : capy::application& app_;
101 : corosio::io_context& ioc_;
102 : section sect_;
103 : router_corosio routes_;
104 : std::vector<http_worker> workers_;
105 : std::vector<corosio::acceptor> acceptors_;
106 : std::vector<http::acceptor_config> configs_;
107 : bool stopped_ = false;
108 : };
109 :
110 : //------------------------------------------------
111 :
112 : inline
113 0 : workers::
114 : workers(
115 : capy::application& app,
116 : corosio::io_context& ioc,
117 : std::size_t num_workers,
118 0 : router_corosio routes)
119 0 : : app_(app)
120 0 : , ioc_(ioc)
121 0 : , sect_(use_log_service(app).get_section("workers"))
122 0 : , routes_(std::move(routes))
123 : {
124 0 : workers_.reserve(num_workers);
125 0 : for (std::size_t i = 0; i < num_workers; ++i)
126 0 : workers_.emplace_back(ioc_);
127 0 : }
128 :
129 : inline
130 : void
131 0 : workers::
132 : listen(
133 : http::acceptor_config config,
134 : corosio::endpoint ep)
135 : {
136 0 : acceptors_.emplace_back(ioc_);
137 0 : acceptors_.back().listen(ep);
138 0 : configs_.push_back(config);
139 0 : }
140 :
141 : inline
142 : void
143 0 : workers::
144 : start()
145 : {
146 0 : stopped_ = false;
147 0 : for (std::size_t i = 0; i < acceptors_.size(); ++i)
148 : {
149 0 : capy::async_run(ioc_.get_executor())(
150 0 : accept_loop(acceptors_[i], configs_[i]));
151 : }
152 0 : }
153 :
154 : inline
155 : void
156 0 : workers::
157 : stop()
158 : {
159 0 : stopped_ = true;
160 0 : for (auto& acc : acceptors_)
161 0 : acc.cancel();
162 0 : for (auto& w : workers_)
163 : {
164 0 : if (w.in_use)
165 0 : w.sock.cancel();
166 : }
167 0 : }
168 :
169 : inline
170 : capy::task<void>
171 0 : workers::
172 : accept_loop(corosio::acceptor& acc, http::acceptor_config config)
173 : {
174 : while (!stopped_)
175 : {
176 : // Find a free worker
177 : http_worker* free_worker = nullptr;
178 : for (auto& w : workers_)
179 : {
180 : if (!w.in_use)
181 : {
182 : free_worker = &w;
183 : break;
184 : }
185 : }
186 :
187 : if (!free_worker)
188 : {
189 : // All workers busy - accept and immediately close
190 : // A production server might queue or have backpressure
191 : LOG_DBG(sect_)("All workers busy, rejecting connection");
192 : corosio::socket temp(ioc_);
193 : auto [ec] = co_await acc.accept(temp);
194 : if (ec)
195 : {
196 : if (stopped_)
197 : break;
198 : LOG_DBG(sect_)("accept error: {}", ec.message());
199 : }
200 : temp.close();
201 : continue;
202 : }
203 :
204 : // Accept into the free worker's socket
205 : auto [ec] = co_await acc.accept(free_worker->sock);
206 : if (ec)
207 : {
208 : if (stopped_)
209 : break;
210 : LOG_DBG(sect_)("accept error: {}", ec.message());
211 : continue;
212 : }
213 :
214 : // Spawn session coroutine
215 : capy::async_run(ioc_.get_executor())(
216 : run_session(*free_worker, config));
217 : }
218 0 : }
219 :
220 : inline
221 : capy::task<void>
222 0 : workers::
223 : run_session(http_worker& w, http::acceptor_config const& config)
224 : {
225 : w.in_use = true;
226 :
227 : http_stream stream(app_, w.sock, routes_);
228 : co_await stream.run(config);
229 :
230 : w.sock.close();
231 : w.in_use = false;
232 0 : }
233 :
234 : } // beast2
235 : } // boost
236 :
237 : #endif
|