LCOV - code coverage report
Current view: top level - corosio/native/detail/epoll - epoll_acceptor_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 80.6 % 253 204 49
Test Date: 2026-02-25 01:27:20 Functions: 96.0 % 25 24 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_EPOLL
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/acceptor_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
      22                 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
      23                 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
      24                 : 
      25                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      26                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      27                 : #include <boost/corosio/detail/make_err.hpp>
      28                 : 
      29                 : #include <memory>
      30                 : #include <mutex>
      31                 : #include <unordered_map>
      32                 : #include <utility>
      33                 : 
      34                 : #include <errno.h>
      35                 : #include <netinet/in.h>
      36                 : #include <sys/epoll.h>
      37                 : #include <sys/socket.h>
      38                 : #include <unistd.h>
      39                 : 
      40                 : namespace boost::corosio::detail {
      41                 : 
      42                 : /** State for epoll acceptor service. */
      43                 : class epoll_acceptor_state
      44                 : {
      45                 : public:
      46 HIT         224 :     explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
      47             224 :         : sched_(sched)
      48                 :     {
      49             224 :     }
      50                 : 
      51                 :     epoll_scheduler& sched_;
      52                 :     std::mutex mutex_;
      53                 :     intrusive_list<epoll_acceptor> acceptor_list_;
      54                 :     std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
      55                 :         acceptor_ptrs_;
      56                 : };
      57                 : 
      58                 : /** epoll acceptor service implementation.
      59                 : 
      60                 :     Inherits from acceptor_service to enable runtime polymorphism.
      61                 :     Uses key_type = acceptor_service for service lookup.
      62                 : */
      63                 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
      64                 : {
      65                 : public:
      66                 :     explicit epoll_acceptor_service(capy::execution_context& ctx);
      67                 :     ~epoll_acceptor_service() override;
      68                 : 
      69                 :     epoll_acceptor_service(epoll_acceptor_service const&)            = delete;
      70                 :     epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
      71                 : 
      72                 :     void shutdown() override;
      73                 : 
      74                 :     io_object::implementation* construct() override;
      75                 :     void destroy(io_object::implementation*) override;
      76                 :     void close(io_object::handle&) override;
      77                 :     std::error_code open_acceptor_socket(
      78                 :         tcp_acceptor::implementation& impl,
      79                 :         int family, int type, int protocol) override;
      80                 :     std::error_code bind_acceptor(
      81                 :         tcp_acceptor::implementation& impl, endpoint ep) override;
      82                 :     std::error_code listen_acceptor(
      83                 :         tcp_acceptor::implementation& impl, int backlog) override;
      84                 : 
      85            4781 :     epoll_scheduler& scheduler() const noexcept
      86                 :     {
      87            4781 :         return state_->sched_;
      88                 :     }
      89                 :     void post(epoll_op* op);
      90                 :     void work_started() noexcept;
      91                 :     void work_finished() noexcept;
      92                 : 
      93                 :     /** Get the socket service for creating peer sockets during accept. */
      94                 :     epoll_socket_service* socket_service() const noexcept;
      95                 : 
      96                 : private:
      97                 :     capy::execution_context& ctx_;
      98                 :     std::unique_ptr<epoll_acceptor_state> state_;
      99                 : };
     100                 : 
     101                 : //--------------------------------------------------------------------------
     102                 : //
     103                 : // Implementation
     104                 : //
     105                 : //--------------------------------------------------------------------------
     106                 : 
     107                 : inline void
     108               6 : epoll_accept_op::cancel() noexcept
     109                 : {
     110               6 :     if (acceptor_impl_)
     111               6 :         acceptor_impl_->cancel_single_op(*this);
     112                 :     else
     113 MIS           0 :         request_cancel();
     114 HIT           6 : }
     115                 : 
     116                 : inline void
     117            4625 : epoll_accept_op::operator()()
     118                 : {
     119            4625 :     stop_cb.reset();
     120                 : 
     121            4625 :     static_cast<epoll_acceptor*>(acceptor_impl_)
     122            4625 :         ->service()
     123            4625 :         .scheduler()
     124            4625 :         .reset_inline_budget();
     125                 : 
     126            4625 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     127                 : 
     128            4625 :     if (cancelled.load(std::memory_order_acquire))
     129               9 :         *ec_out = capy::error::canceled;
     130            4616 :     else if (errn != 0)
     131 MIS           0 :         *ec_out = make_err(errn);
     132                 :     else
     133 HIT        4616 :         *ec_out = {};
     134                 : 
     135                 :     // Set up the peer socket on success
     136            4625 :     if (success && accepted_fd >= 0 && acceptor_impl_)
     137                 :     {
     138            4616 :         auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
     139            4616 :                                ->service()
     140            4616 :                                .socket_service();
     141            4616 :         if (socket_svc)
     142                 :         {
     143            4616 :             auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
     144            4616 :             impl.set_socket(accepted_fd);
     145                 : 
     146            4616 :             impl.desc_state_.fd = accepted_fd;
     147                 :             {
     148            4616 :                 std::lock_guard lock(impl.desc_state_.mutex);
     149            4616 :                 impl.desc_state_.read_op    = nullptr;
     150            4616 :                 impl.desc_state_.write_op   = nullptr;
     151            4616 :                 impl.desc_state_.connect_op = nullptr;
     152            4616 :             }
     153            4616 :             socket_svc->scheduler().register_descriptor(
     154                 :                 accepted_fd, &impl.desc_state_);
     155                 : 
     156            4616 :             impl.set_endpoints(
     157            4616 :                 static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
     158            4616 :                 from_sockaddr(peer_storage));
     159                 : 
     160            4616 :             if (impl_out)
     161            4616 :                 *impl_out = &impl;
     162            4616 :             accepted_fd = -1;
     163                 :         }
     164                 :         else
     165                 :         {
     166                 :             // No socket service — treat as error
     167 MIS           0 :             *ec_out = make_err(ENOENT);
     168               0 :             success = false;
     169                 :         }
     170                 :     }
     171                 : 
     172 HIT        4625 :     if (!success || !acceptor_impl_)
     173                 :     {
     174               9 :         if (accepted_fd >= 0)
     175                 :         {
     176 MIS           0 :             ::close(accepted_fd);
     177               0 :             accepted_fd = -1;
     178                 :         }
     179 HIT           9 :         if (impl_out)
     180               9 :             *impl_out = nullptr;
     181                 :     }
     182                 : 
     183                 :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     184            4625 :     capy::executor_ref saved_ex(ex);
     185            4625 :     std::coroutine_handle<> saved_h(h);
     186            4625 :     auto prevent_premature_destruction = std::move(impl_ptr);
     187            4625 :     dispatch_coro(saved_ex, saved_h).resume();
     188            4625 : }
     189                 : 
     190              82 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
     191              82 :     : svc_(svc)
     192                 : {
     193              82 : }
     194                 : 
     195                 : inline std::coroutine_handle<>
     196            4625 : epoll_acceptor::accept(
     197                 :     std::coroutine_handle<> h,
     198                 :     capy::executor_ref ex,
     199                 :     std::stop_token token,
     200                 :     std::error_code* ec,
     201                 :     io_object::implementation** impl_out)
     202                 : {
     203            4625 :     auto& op = acc_;
     204            4625 :     op.reset();
     205            4625 :     op.h        = h;
     206            4625 :     op.ex       = ex;
     207            4625 :     op.ec_out   = ec;
     208            4625 :     op.impl_out = impl_out;
     209            4625 :     op.fd       = fd_;
     210            4625 :     op.start(token, this);
     211                 : 
     212            4625 :     sockaddr_storage peer_storage{};
     213            4625 :     socklen_t addrlen = sizeof(peer_storage);
     214                 :     int accepted;
     215                 :     do
     216                 :     {
     217            4625 :         accepted = ::accept4(
     218                 :             fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
     219                 :             SOCK_NONBLOCK | SOCK_CLOEXEC);
     220                 :     }
     221            4625 :     while (accepted < 0 && errno == EINTR);
     222                 : 
     223            4625 :     if (accepted >= 0)
     224                 :     {
     225                 :         {
     226               2 :             std::lock_guard lock(desc_state_.mutex);
     227               2 :             desc_state_.read_ready = false;
     228               2 :         }
     229                 : 
     230               2 :         if (svc_.scheduler().try_consume_inline_budget())
     231                 :         {
     232 MIS           0 :             auto* socket_svc = svc_.socket_service();
     233               0 :             if (socket_svc)
     234                 :             {
     235                 :                 auto& impl =
     236               0 :                     static_cast<epoll_socket&>(*socket_svc->construct());
     237               0 :                 impl.set_socket(accepted);
     238                 : 
     239               0 :                 impl.desc_state_.fd = accepted;
     240                 :                 {
     241               0 :                     std::lock_guard lock(impl.desc_state_.mutex);
     242               0 :                     impl.desc_state_.read_op    = nullptr;
     243               0 :                     impl.desc_state_.write_op   = nullptr;
     244               0 :                     impl.desc_state_.connect_op = nullptr;
     245               0 :                 }
     246               0 :                 socket_svc->scheduler().register_descriptor(
     247                 :                     accepted, &impl.desc_state_);
     248                 : 
     249               0 :                 impl.set_endpoints(
     250                 :                     local_endpoint_, from_sockaddr(peer_storage));
     251                 : 
     252               0 :                 *ec = {};
     253               0 :                 if (impl_out)
     254               0 :                     *impl_out = &impl;
     255                 :             }
     256                 :             else
     257                 :             {
     258               0 :                 ::close(accepted);
     259               0 :                 *ec = make_err(ENOENT);
     260               0 :                 if (impl_out)
     261               0 :                     *impl_out = nullptr;
     262                 :             }
     263               0 :             return dispatch_coro(ex, h);
     264                 :         }
     265                 : 
     266 HIT           2 :         op.accepted_fd   = accepted;
     267               2 :         op.peer_storage  = peer_storage;
     268               2 :         op.complete(0, 0);
     269               2 :         op.impl_ptr = shared_from_this();
     270               2 :         svc_.post(&op);
     271               2 :         return std::noop_coroutine();
     272                 :     }
     273                 : 
     274            4623 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     275                 :     {
     276            4623 :         op.impl_ptr = shared_from_this();
     277            4623 :         svc_.work_started();
     278                 : 
     279            4623 :         std::lock_guard lock(desc_state_.mutex);
     280            4623 :         bool io_done = false;
     281            4623 :         if (desc_state_.read_ready)
     282                 :         {
     283 MIS           0 :             desc_state_.read_ready = false;
     284               0 :             op.perform_io();
     285               0 :             io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
     286               0 :             if (!io_done)
     287               0 :                 op.errn = 0;
     288                 :         }
     289                 : 
     290 HIT        4623 :         if (io_done || op.cancelled.load(std::memory_order_acquire))
     291                 :         {
     292 MIS           0 :             svc_.post(&op);
     293               0 :             svc_.work_finished();
     294                 :         }
     295                 :         else
     296                 :         {
     297 HIT        4623 :             desc_state_.read_op = &op;
     298                 :         }
     299            4623 :         return std::noop_coroutine();
     300            4623 :     }
     301                 : 
     302 MIS           0 :     op.complete(errno, 0);
     303               0 :     op.impl_ptr = shared_from_this();
     304               0 :     svc_.post(&op);
     305                 :     // completion is always posted to scheduler queue, never inline.
     306               0 :     return std::noop_coroutine();
     307                 : }
     308                 : 
     309                 : inline void
     310 HIT           2 : epoll_acceptor::cancel() noexcept
     311                 : {
     312               2 :     cancel_single_op(acc_);
     313               2 : }
     314                 : 
     315                 : inline void
     316               8 : epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
     317                 : {
     318               8 :     auto self = weak_from_this().lock();
     319               8 :     if (!self)
     320 MIS           0 :         return;
     321                 : 
     322 HIT           8 :     op.request_cancel();
     323                 : 
     324               8 :     epoll_op* claimed = nullptr;
     325                 :     {
     326               8 :         std::lock_guard lock(desc_state_.mutex);
     327               8 :         if (desc_state_.read_op == &op)
     328               7 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     329               8 :     }
     330               8 :     if (claimed)
     331                 :     {
     332               7 :         op.impl_ptr = self;
     333               7 :         svc_.post(&op);
     334               7 :         svc_.work_finished();
     335                 :     }
     336               8 : }
     337                 : 
     338                 : inline void
     339             326 : epoll_acceptor::close_socket() noexcept
     340                 : {
     341             326 :     auto self = weak_from_this().lock();
     342             326 :     if (self)
     343                 :     {
     344             326 :         acc_.request_cancel();
     345                 : 
     346             326 :         epoll_op* claimed = nullptr;
     347                 :         {
     348             326 :             std::lock_guard lock(desc_state_.mutex);
     349             326 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     350             326 :             desc_state_.read_ready  = false;
     351             326 :             desc_state_.write_ready = false;
     352             326 :         }
     353                 : 
     354             326 :         if (claimed)
     355                 :         {
     356               2 :             acc_.impl_ptr = self;
     357               2 :             svc_.post(&acc_);
     358               2 :             svc_.work_finished();
     359                 :         }
     360                 : 
     361             326 :         if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     362 MIS           0 :             desc_state_.impl_ref_ = self;
     363                 :     }
     364                 : 
     365 HIT         326 :     if (fd_ >= 0)
     366                 :     {
     367              81 :         if (desc_state_.registered_events != 0)
     368              77 :             svc_.scheduler().deregister_descriptor(fd_);
     369              81 :         ::close(fd_);
     370              81 :         fd_ = -1;
     371                 :     }
     372                 : 
     373             326 :     desc_state_.fd                = -1;
     374             326 :     desc_state_.registered_events = 0;
     375                 : 
     376             326 :     local_endpoint_ = endpoint{};
     377             326 : }
     378                 : 
     379             224 : inline epoll_acceptor_service::epoll_acceptor_service(
     380             224 :     capy::execution_context& ctx)
     381             224 :     : ctx_(ctx)
     382             224 :     , state_(
     383                 :           std::make_unique<epoll_acceptor_state>(
     384             224 :               ctx.use_service<epoll_scheduler>()))
     385                 : {
     386             224 : }
     387                 : 
     388             448 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
     389                 : 
     390                 : inline void
     391             224 : epoll_acceptor_service::shutdown()
     392                 : {
     393             224 :     std::lock_guard lock(state_->mutex_);
     394                 : 
     395             224 :     while (auto* impl = state_->acceptor_list_.pop_front())
     396 MIS           0 :         impl->close_socket();
     397                 : 
     398                 :     // Don't clear acceptor_ptrs_ here — same rationale as
     399                 :     // epoll_socket_service::shutdown(). Let ~state_ release ptrs
     400                 :     // after scheduler shutdown has drained all queued ops.
     401 HIT         224 : }
     402                 : 
     403                 : inline io_object::implementation*
     404              82 : epoll_acceptor_service::construct()
     405                 : {
     406              82 :     auto impl = std::make_shared<epoll_acceptor>(*this);
     407              82 :     auto* raw = impl.get();
     408                 : 
     409              82 :     std::lock_guard lock(state_->mutex_);
     410              82 :     state_->acceptor_list_.push_back(raw);
     411              82 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     412                 : 
     413              82 :     return raw;
     414              82 : }
     415                 : 
     416                 : inline void
     417              82 : epoll_acceptor_service::destroy(io_object::implementation* impl)
     418                 : {
     419              82 :     auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
     420              82 :     epoll_impl->close_socket();
     421              82 :     std::lock_guard lock(state_->mutex_);
     422              82 :     state_->acceptor_list_.remove(epoll_impl);
     423              82 :     state_->acceptor_ptrs_.erase(epoll_impl);
     424              82 : }
     425                 : 
     426                 : inline void
     427             163 : epoll_acceptor_service::close(io_object::handle& h)
     428                 : {
     429             163 :     static_cast<epoll_acceptor*>(h.get())->close_socket();
     430             163 : }
     431                 : 
     432                 : inline std::error_code
     433              79 : epoll_acceptor::set_option(
     434                 :     int level, int optname,
     435                 :     void const* data, std::size_t size) noexcept
     436                 : {
     437              79 :     if (::setsockopt(fd_, level, optname, data,
     438              79 :             static_cast<socklen_t>(size)) != 0)
     439 MIS           0 :         return make_err(errno);
     440 HIT          79 :     return {};
     441                 : }
     442                 : 
     443                 : inline std::error_code
     444 MIS           0 : epoll_acceptor::get_option(
     445                 :     int level, int optname,
     446                 :     void* data, std::size_t* size) const noexcept
     447                 : {
     448               0 :     socklen_t len = static_cast<socklen_t>(*size);
     449               0 :     if (::getsockopt(fd_, level, optname, data, &len) != 0)
     450               0 :         return make_err(errno);
     451               0 :     *size = static_cast<std::size_t>(len);
     452               0 :     return {};
     453                 : }
     454                 : 
     455                 : inline std::error_code
     456 HIT          81 : epoll_acceptor_service::open_acceptor_socket(
     457                 :     tcp_acceptor::implementation& impl,
     458                 :     int family, int type, int protocol)
     459                 : {
     460              81 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     461              81 :     epoll_impl->close_socket();
     462                 : 
     463              81 :     int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
     464              81 :     if (fd < 0)
     465 MIS           0 :         return make_err(errno);
     466                 : 
     467 HIT          81 :     if (family == AF_INET6)
     468                 :     {
     469               8 :         int val = 0; // dual-stack default
     470               8 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
     471                 :     }
     472                 : 
     473              81 :     epoll_impl->fd_ = fd;
     474                 : 
     475                 :     // Set up descriptor state but do NOT register with epoll yet
     476              81 :     epoll_impl->desc_state_.fd = fd;
     477                 :     {
     478              81 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     479              81 :         epoll_impl->desc_state_.read_op = nullptr;
     480              81 :     }
     481                 : 
     482              81 :     return {};
     483                 : }
     484                 : 
     485                 : inline std::error_code
     486              80 : epoll_acceptor_service::bind_acceptor(
     487                 :     tcp_acceptor::implementation& impl, endpoint ep)
     488                 : {
     489              80 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     490              80 :     int fd = epoll_impl->fd_;
     491                 : 
     492              80 :     sockaddr_storage storage{};
     493              80 :     socklen_t addrlen = detail::to_sockaddr(ep, storage);
     494              80 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
     495               3 :         return make_err(errno);
     496                 : 
     497                 :     // Cache local endpoint (resolves ephemeral port)
     498              77 :     sockaddr_storage local{};
     499              77 :     socklen_t local_len = sizeof(local);
     500              77 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
     501              77 :         epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
     502                 : 
     503              77 :     return {};
     504                 : }
     505                 : 
     506                 : inline std::error_code
     507              77 : epoll_acceptor_service::listen_acceptor(
     508                 :     tcp_acceptor::implementation& impl, int backlog)
     509                 : {
     510              77 :     auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
     511              77 :     int fd = epoll_impl->fd_;
     512                 : 
     513              77 :     if (::listen(fd, backlog) < 0)
     514 MIS           0 :         return make_err(errno);
     515                 : 
     516                 :     // Register fd with epoll (edge-triggered mode)
     517 HIT          77 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     518                 : 
     519              77 :     return {};
     520                 : }
     521                 : 
     522                 : inline void
     523              11 : epoll_acceptor_service::post(epoll_op* op)
     524                 : {
     525              11 :     state_->sched_.post(op);
     526              11 : }
     527                 : 
     528                 : inline void
     529            4623 : epoll_acceptor_service::work_started() noexcept
     530                 : {
     531            4623 :     state_->sched_.work_started();
     532            4623 : }
     533                 : 
     534                 : inline void
     535               9 : epoll_acceptor_service::work_finished() noexcept
     536                 : {
     537               9 :     state_->sched_.work_finished();
     538               9 : }
     539                 : 
     540                 : inline epoll_socket_service*
     541            4616 : epoll_acceptor_service::socket_service() const noexcept
     542                 : {
     543            4616 :     auto* svc = ctx_.find_service<detail::socket_service>();
     544            4616 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     545                 : }
     546                 : 
     547                 : } // namespace boost::corosio::detail
     548                 : 
     549                 : #endif // BOOST_COROSIO_HAS_EPOLL
     550                 : 
     551                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
        

Generated by: LCOV version 2.3