LCOV - code coverage report
Current view: top level - boost/beast2/server - workers.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 0.0 % 38 0
Test Date: 2026-01-15 20:49:55 Functions: 0.0 % 9 0

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/beast2
       8              : //
       9              : 
      10              : #ifndef BOOST_BEAST2_SERVER_WORKERS_HPP
      11              : #define BOOST_BEAST2_SERVER_WORKERS_HPP
      12              : 
      13              : #include <boost/beast2/detail/config.hpp>
      14              : #include <boost/beast2/log_service.hpp>
      15              : #include <boost/beast2/server/fixed_array.hpp>
      16              : #include <boost/beast2/server/router_corosio.hpp>
      17              : #include <boost/beast2/server/http_stream.hpp>
      18              : #include <boost/capy/application.hpp>
      19              : #include <boost/capy/task.hpp>
      20              : #include <boost/capy/ex/async_run.hpp>
      21              : #include <boost/corosio/acceptor.hpp>
      22              : #include <boost/corosio/socket.hpp>
      23              : #include <boost/corosio/io_context.hpp>
      24              : #include <boost/corosio/endpoint.hpp>
      25              : #include <boost/http/server/route_handler.hpp>
      26              : 
      27              : #include <vector>
      28              : 
      29              : namespace boost {
      30              : namespace beast2 {
      31              : 
      32              : /** A preallocated worker that handles one HTTP connection at a time.
      33              : */
      34              : struct http_worker
      35              : {
      36              :     corosio::socket sock;
      37              :     bool in_use = false;
      38              : 
      39            0 :     explicit http_worker(corosio::io_context& ioc)
      40            0 :         : sock(ioc)
      41              :     {
      42            0 :     }
      43              : 
      44            0 :     http_worker(http_worker&&) = default;
      45              :     http_worker& operator=(http_worker&&) = default;
      46              : };
      47              : 
      48              : //------------------------------------------------
      49              : 
      50              : /** A set of accepting sockets and their workers.
      51              : 
      52              :     This implements a server that accepts incoming connections
      53              :     and handles HTTP requests using preallocated workers.
      54              : */
      55              : class workers
      56              : {
      57              : public:
      58            0 :     ~workers() = default;
      59              : 
      60              :     /** Constructor
      61              : 
      62              :         @param app The application which holds this server
      63              :         @param ioc The I/O context for async operations
      64              :         @param num_workers The number of workers to preallocate
      65              :         @param routes The router for dispatching requests
      66              :     */
      67              :     workers(
      68              :         capy::application& app,
      69              :         corosio::io_context& ioc,
      70              :         std::size_t num_workers,
      71              :         router_corosio routes);
      72              : 
      73              :     /** Add a listening endpoint
      74              : 
      75              :         @param config Acceptor configuration
      76              :         @param ep The endpoint to listen on
      77              :     */
      78              :     void
      79              :     listen(
      80              :         http::acceptor_config config,
      81              :         corosio::endpoint ep);
      82              : 
      83              :     /** Start the accept loop
      84              : 
      85              :         Must be called after listen() to begin accepting connections.
      86              :     */
      87              :     void start();
      88              : 
      89              :     /** Stop accepting and cancel all connections
      90              :     */
      91              :     void stop();
      92              : 
      93              : private:
      94              :     capy::task<void>
      95              :     accept_loop(corosio::acceptor& acc, http::acceptor_config config);
      96              : 
      97              :     capy::task<void>
      98              :     run_session(http_worker& w, http::acceptor_config const& config);
      99              : 
     100              :     capy::application& app_;
     101              :     corosio::io_context& ioc_;
     102              :     section sect_;
     103              :     router_corosio routes_;
     104              :     std::vector<http_worker> workers_;
     105              :     std::vector<corosio::acceptor> acceptors_;
     106              :     std::vector<http::acceptor_config> configs_;
     107              :     bool stopped_ = false;
     108              : };
     109              : 
     110              : //------------------------------------------------
     111              : 
     112              : inline
     113            0 : workers::
     114              : workers(
     115              :     capy::application& app,
     116              :     corosio::io_context& ioc,
     117              :     std::size_t num_workers,
     118            0 :     router_corosio routes)
     119            0 :     : app_(app)
     120            0 :     , ioc_(ioc)
     121            0 :     , sect_(use_log_service(app).get_section("workers"))
     122            0 :     , routes_(std::move(routes))
     123              : {
     124            0 :     workers_.reserve(num_workers);
     125            0 :     for (std::size_t i = 0; i < num_workers; ++i)
     126            0 :         workers_.emplace_back(ioc_);
     127            0 : }
     128              : 
     129              : inline
     130              : void
     131            0 : workers::
     132              : listen(
     133              :     http::acceptor_config config,
     134              :     corosio::endpoint ep)
     135              : {
     136            0 :     acceptors_.emplace_back(ioc_);
     137            0 :     acceptors_.back().listen(ep);
     138            0 :     configs_.push_back(config);
     139            0 : }
     140              : 
     141              : inline
     142              : void
     143            0 : workers::
     144              : start()
     145              : {
     146            0 :     stopped_ = false;
     147            0 :     for (std::size_t i = 0; i < acceptors_.size(); ++i)
     148              :     {
     149            0 :         capy::async_run(ioc_.get_executor())(
     150            0 :             accept_loop(acceptors_[i], configs_[i]));
     151              :     }
     152            0 : }
     153              : 
     154              : inline
     155              : void
     156            0 : workers::
     157              : stop()
     158              : {
     159            0 :     stopped_ = true;
     160            0 :     for (auto& acc : acceptors_)
     161            0 :         acc.cancel();
     162            0 :     for (auto& w : workers_)
     163              :     {
     164            0 :         if (w.in_use)
     165            0 :             w.sock.cancel();
     166              :     }
     167            0 : }
     168              : 
     169              : inline
     170              : capy::task<void>
     171            0 : workers::
     172              : accept_loop(corosio::acceptor& acc, http::acceptor_config config)
     173              : {
     174              :     while (!stopped_)
     175              :     {
     176              :         // Find a free worker
     177              :         http_worker* free_worker = nullptr;
     178              :         for (auto& w : workers_)
     179              :         {
     180              :             if (!w.in_use)
     181              :             {
     182              :                 free_worker = &w;
     183              :                 break;
     184              :             }
     185              :         }
     186              : 
     187              :         if (!free_worker)
     188              :         {
     189              :             // All workers busy - accept and immediately close
     190              :             // A production server might queue or have backpressure
     191              :             LOG_DBG(sect_)("All workers busy, rejecting connection");
     192              :             corosio::socket temp(ioc_);
     193              :             auto [ec] = co_await acc.accept(temp);
     194              :             if (ec)
     195              :             {
     196              :                 if (stopped_)
     197              :                     break;
     198              :                 LOG_DBG(sect_)("accept error: {}", ec.message());
     199              :             }
     200              :             temp.close();
     201              :             continue;
     202              :         }
     203              : 
     204              :         // Accept into the free worker's socket
     205              :         auto [ec] = co_await acc.accept(free_worker->sock);
     206              :         if (ec)
     207              :         {
     208              :             if (stopped_)
     209              :                 break;
     210              :             LOG_DBG(sect_)("accept error: {}", ec.message());
     211              :             continue;
     212              :         }
     213              : 
     214              :         // Spawn session coroutine
     215              :         capy::async_run(ioc_.get_executor())(
     216              :             run_session(*free_worker, config));
     217              :     }
     218            0 : }
     219              : 
     220              : inline
     221              : capy::task<void>
     222            0 : workers::
     223              : run_session(http_worker& w, http::acceptor_config const& config)
     224              : {
     225              :     w.in_use = true;
     226              : 
     227              :     http_stream stream(app_, w.sock, routes_);
     228              :     co_await stream.run(config);
     229              : 
     230              :     w.sock.close();
     231              :     w.in_use = false;
     232            0 : }
     233              : 
     234              : } // beast2
     235              : } // boost
     236              : 
     237              : #endif
        

Generated by: LCOV version 2.3