LCOV - code coverage report
Current view: top level - corosio/native/detail/select - select_socket_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 75.8 % 347 263 84
Test Date: 2026-02-25 01:27:20 Functions: 93.3 % 30 28 2

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/corosio
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      11                 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
      12                 : 
      13                 : #include <boost/corosio/detail/platform.hpp>
      14                 : 
      15                 : #if BOOST_COROSIO_HAS_SELECT
      16                 : 
      17                 : #include <boost/corosio/detail/config.hpp>
      18                 : #include <boost/capy/ex/execution_context.hpp>
      19                 : #include <boost/corosio/detail/socket_service.hpp>
      20                 : 
      21                 : #include <boost/corosio/native/detail/select/select_socket.hpp>
      22                 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
      23                 : 
      24                 : #include <boost/corosio/detail/endpoint_convert.hpp>
      25                 : #include <boost/corosio/detail/dispatch_coro.hpp>
      26                 : #include <boost/corosio/detail/make_err.hpp>
      27                 : 
      28                 : #include <boost/corosio/detail/except.hpp>
      29                 : 
      30                 : #include <boost/capy/buffers.hpp>
      31                 : 
      32                 : #include <errno.h>
      33                 : #include <fcntl.h>
      34                 : #include <netinet/in.h>
      35                 : #include <netinet/tcp.h>
      36                 : #include <sys/socket.h>
      37                 : #include <unistd.h>
      38                 : 
      39                 : #include <memory>
      40                 : #include <mutex>
      41                 : #include <unordered_map>
      42                 : 
      43                 : /*
      44                 :     select Socket Implementation
      45                 :     ============================
      46                 : 
      47                 :     This mirrors the epoll_sockets design for behavioral consistency.
      48                 :     Each I/O operation follows the same pattern:
      49                 :       1. Try the syscall immediately (non-blocking socket)
      50                 :       2. If it succeeds or fails with a real error, post to completion queue
      51                 :       3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
      52                 : 
      53                 :     Cancellation
      54                 :     ------------
      55                 :     See op.hpp for the completion/cancellation race handling via the
      56                 :     `registered` atomic. cancel() must complete pending operations (post
      57                 :     them with cancelled flag) so coroutines waiting on them can resume.
      58                 :     close_socket() calls cancel() first to ensure this.
      59                 : 
      60                 :     Impl Lifetime with shared_ptr
      61                 :     -----------------------------
      62                 :     Socket impls use enable_shared_from_this. The service owns impls via
      63                 :     shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
      64                 :     removal. When a user calls close(), we call cancel() which posts pending
      65                 :     ops to the scheduler.
      66                 : 
      67                 :     CRITICAL: The posted ops must keep the impl alive until they complete.
      68                 :     Otherwise the scheduler would process a freed op (use-after-free). The
      69                 :     cancel() method captures shared_from_this() into op.impl_ptr before
      70                 :     posting. When the op completes, impl_ptr is cleared, allowing the impl
      71                 :     to be destroyed if no other references exist.
      72                 : 
      73                 :     Service Ownership
      74                 :     -----------------
      75                 :     select_socket_service owns all socket impls. destroy() removes the
      76                 :     shared_ptr from the map, but the impl may survive if ops still hold
      77                 :     impl_ptr refs. shutdown() closes all sockets and clears the map; any
      78                 :     in-flight ops will complete and release their refs.
      79                 : */
      80                 : 
      81                 : namespace boost::corosio::detail {
      82                 : 
      83                 : /** State for select socket service. */
      84                 : class select_socket_state
      85                 : {
      86                 : public:
      87 HIT         154 :     explicit select_socket_state(select_scheduler& sched) noexcept
      88             154 :         : sched_(sched)
      89                 :     {
      90             154 :     }
      91                 : 
      92                 :     select_scheduler& sched_;
      93                 :     std::mutex mutex_;
      94                 :     intrusive_list<select_socket> socket_list_;
      95                 :     std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
      96                 :         socket_ptrs_;
      97                 : };
      98                 : 
      99                 : /** select socket service implementation.
     100                 : 
     101                 :     Inherits from socket_service to enable runtime polymorphism.
     102                 :     Uses key_type = socket_service for service lookup.
     103                 : */
     104                 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
     105                 : {
     106                 : public:
     107                 :     explicit select_socket_service(capy::execution_context& ctx);
     108                 :     ~select_socket_service() override;
     109                 : 
     110                 :     select_socket_service(select_socket_service const&)            = delete;
     111                 :     select_socket_service& operator=(select_socket_service const&) = delete;
     112                 : 
     113                 :     void shutdown() override;
     114                 : 
     115                 :     io_object::implementation* construct() override;
     116                 :     void destroy(io_object::implementation*) override;
     117                 :     void close(io_object::handle&) override;
     118                 :     std::error_code
     119                 :     open_socket(tcp_socket::implementation& impl,
     120                 :                 int family, int type, int protocol) override;
     121                 : 
     122            9677 :     select_scheduler& scheduler() const noexcept
     123                 :     {
     124            9677 :         return state_->sched_;
     125                 :     }
     126                 :     void post(select_op* op);
     127                 :     void work_started() noexcept;
     128                 :     void work_finished() noexcept;
     129                 : 
     130                 : private:
     131                 :     std::unique_ptr<select_socket_state> state_;
     132                 : };
     133                 : 
     134                 : // Backward compatibility alias
     135                 : using select_sockets = select_socket_service;
     136                 : 
     137                 : inline void
     138              99 : select_op::canceller::operator()() const noexcept
     139                 : {
     140              99 :     op->cancel();
     141              99 : }
     142                 : 
     143                 : inline void
     144 MIS           0 : select_connect_op::cancel() noexcept
     145                 : {
     146               0 :     if (socket_impl_)
     147               0 :         socket_impl_->cancel_single_op(*this);
     148                 :     else
     149               0 :         request_cancel();
     150               0 : }
     151                 : 
     152                 : inline void
     153 HIT          99 : select_read_op::cancel() noexcept
     154                 : {
     155              99 :     if (socket_impl_)
     156              99 :         socket_impl_->cancel_single_op(*this);
     157                 :     else
     158 MIS           0 :         request_cancel();
     159 HIT          99 : }
     160                 : 
     161                 : inline void
     162 MIS           0 : select_write_op::cancel() noexcept
     163                 : {
     164               0 :     if (socket_impl_)
     165               0 :         socket_impl_->cancel_single_op(*this);
     166                 :     else
     167               0 :         request_cancel();
     168               0 : }
     169                 : 
     170                 : inline void
     171 HIT        3074 : select_connect_op::operator()()
     172                 : {
     173            3074 :     stop_cb.reset();
     174                 : 
     175            3074 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
     176                 : 
     177                 :     // Cache endpoints on successful connect
     178            3074 :     if (success && socket_impl_)
     179                 :     {
     180            3071 :         endpoint local_ep;
     181            3071 :         sockaddr_storage local_storage{};
     182            3071 :         socklen_t local_len = sizeof(local_storage);
     183            3071 :         if (::getsockname(
     184                 :                 fd, reinterpret_cast<sockaddr*>(&local_storage),
     185            3071 :                 &local_len) == 0)
     186            3071 :             local_ep = from_sockaddr(local_storage);
     187            3071 :         static_cast<select_socket*>(socket_impl_)
     188            3071 :             ->set_endpoints(local_ep, target_endpoint);
     189                 :     }
     190                 : 
     191            3074 :     if (ec_out)
     192                 :     {
     193            3074 :         if (cancelled.load(std::memory_order_acquire))
     194 MIS           0 :             *ec_out = capy::error::canceled;
     195 HIT        3074 :         else if (errn != 0)
     196               3 :             *ec_out = make_err(errn);
     197                 :         else
     198            3071 :             *ec_out = {};
     199                 :     }
     200                 : 
     201            3074 :     if (bytes_out)
     202 MIS           0 :         *bytes_out = bytes_transferred;
     203                 : 
     204                 :     // Move to stack before destroying the frame
     205 HIT        3074 :     capy::executor_ref saved_ex(ex);
     206            3074 :     std::coroutine_handle<> saved_h(h);
     207            3074 :     impl_ptr.reset();
     208            3074 :     dispatch_coro(saved_ex, saved_h).resume();
     209            3074 : }
     210                 : 
     211            9240 : inline select_socket::select_socket(select_socket_service& svc) noexcept
     212            9240 :     : svc_(svc)
     213                 : {
     214            9240 : }
     215                 : 
     216                 : inline std::coroutine_handle<>
     217            3074 : select_socket::connect(
     218                 :     std::coroutine_handle<> h,
     219                 :     capy::executor_ref ex,
     220                 :     endpoint ep,
     221                 :     std::stop_token token,
     222                 :     std::error_code* ec)
     223                 : {
     224            3074 :     auto& op = conn_;
     225            3074 :     op.reset();
     226            3074 :     op.h               = h;
     227            3074 :     op.ex              = ex;
     228            3074 :     op.ec_out          = ec;
     229            3074 :     op.fd              = fd_;
     230            3074 :     op.target_endpoint = ep; // Store target for endpoint caching
     231            3074 :     op.start(token, this);
     232                 : 
     233            3074 :     sockaddr_storage storage{};
     234                 :     socklen_t addrlen =
     235            3074 :         detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
     236                 :     int result =
     237            3074 :         ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
     238                 : 
     239            3074 :     if (result == 0)
     240                 :     {
     241                 :         // Sync success — cache endpoints immediately
     242 MIS           0 :         sockaddr_storage local_storage{};
     243               0 :         socklen_t local_len = sizeof(local_storage);
     244               0 :         if (::getsockname(
     245                 :                 fd_, reinterpret_cast<sockaddr*>(&local_storage),
     246               0 :                 &local_len) == 0)
     247               0 :             local_endpoint_ = detail::from_sockaddr(local_storage);
     248               0 :         remote_endpoint_ = ep;
     249                 : 
     250               0 :         op.complete(0, 0);
     251               0 :         op.impl_ptr = shared_from_this();
     252               0 :         svc_.post(&op);
     253                 :         // completion is always posted to scheduler queue, never inline.
     254               0 :         return std::noop_coroutine();
     255                 :     }
     256                 : 
     257 HIT        3074 :     if (errno == EINPROGRESS)
     258                 :     {
     259            3074 :         svc_.work_started();
     260            3074 :         op.impl_ptr = shared_from_this();
     261                 : 
     262                 :         // Set registering BEFORE register_fd to close the race window where
     263                 :         // reactor sees an event before we set registered. The reactor treats
     264                 :         // registering the same as registered when claiming the op.
     265            3074 :         op.registered.store(
     266                 :             select_registration_state::registering, std::memory_order_release);
     267            3074 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     268                 : 
     269                 :         // Transition to registered. If this fails, reactor or cancel already
     270                 :         // claimed the op (state is now unregistered), so we're done. However,
     271                 :         // we must still deregister the fd because cancel's deregister_fd may
     272                 :         // have run before our register_fd, leaving the fd orphaned.
     273            3074 :         auto expected = select_registration_state::registering;
     274            3074 :         if (!op.registered.compare_exchange_strong(
     275                 :                 expected, select_registration_state::registered,
     276                 :                 std::memory_order_acq_rel))
     277                 :         {
     278 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     279                 :             // completion is always posted to scheduler queue, never inline.
     280               0 :             return std::noop_coroutine();
     281                 :         }
     282                 : 
     283                 :         // If cancelled was set before we registered, handle it now.
     284 HIT        3074 :         if (op.cancelled.load(std::memory_order_acquire))
     285                 :         {
     286 MIS           0 :             auto prev = op.registered.exchange(
     287                 :                 select_registration_state::unregistered,
     288                 :                 std::memory_order_acq_rel);
     289               0 :             if (prev != select_registration_state::unregistered)
     290                 :             {
     291               0 :                 svc_.scheduler().deregister_fd(
     292                 :                     fd_, select_scheduler::event_write);
     293               0 :                 op.impl_ptr = shared_from_this();
     294               0 :                 svc_.post(&op);
     295               0 :                 svc_.work_finished();
     296                 :             }
     297                 :         }
     298                 :         // completion is always posted to scheduler queue, never inline.
     299 HIT        3074 :         return std::noop_coroutine();
     300                 :     }
     301                 : 
     302 MIS           0 :     op.complete(errno, 0);
     303               0 :     op.impl_ptr = shared_from_this();
     304               0 :     svc_.post(&op);
     305                 :     // completion is always posted to scheduler queue, never inline.
     306               0 :     return std::noop_coroutine();
     307                 : }
     308                 : 
     309                 : inline std::coroutine_handle<>
     310 HIT      114920 : select_socket::read_some(
     311                 :     std::coroutine_handle<> h,
     312                 :     capy::executor_ref ex,
     313                 :     io_buffer_param param,
     314                 :     std::stop_token token,
     315                 :     std::error_code* ec,
     316                 :     std::size_t* bytes_out)
     317                 : {
     318          114920 :     auto& op = rd_;
     319          114920 :     op.reset();
     320          114920 :     op.h         = h;
     321          114920 :     op.ex        = ex;
     322          114920 :     op.ec_out    = ec;
     323          114920 :     op.bytes_out = bytes_out;
     324          114920 :     op.fd        = fd_;
     325          114920 :     op.start(token, this);
     326                 : 
     327          114920 :     capy::mutable_buffer bufs[select_read_op::max_buffers];
     328          114920 :     op.iovec_count =
     329          114920 :         static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
     330                 : 
     331          114920 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     332                 :     {
     333               1 :         op.empty_buffer_read = true;
     334               1 :         op.complete(0, 0);
     335               1 :         op.impl_ptr = shared_from_this();
     336               1 :         svc_.post(&op);
     337               1 :         return std::noop_coroutine();
     338                 :     }
     339                 : 
     340          229838 :     for (int i = 0; i < op.iovec_count; ++i)
     341                 :     {
     342          114919 :         op.iovecs[i].iov_base = bufs[i].data();
     343          114919 :         op.iovecs[i].iov_len  = bufs[i].size();
     344                 :     }
     345                 : 
     346          114919 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     347                 : 
     348          114919 :     if (n > 0)
     349                 :     {
     350          114631 :         op.complete(0, static_cast<std::size_t>(n));
     351          114631 :         op.impl_ptr = shared_from_this();
     352          114631 :         svc_.post(&op);
     353          114631 :         return std::noop_coroutine();
     354                 :     }
     355                 : 
     356             288 :     if (n == 0)
     357                 :     {
     358               5 :         op.complete(0, 0);
     359               5 :         op.impl_ptr = shared_from_this();
     360               5 :         svc_.post(&op);
     361               5 :         return std::noop_coroutine();
     362                 :     }
     363                 : 
     364             283 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     365                 :     {
     366             283 :         svc_.work_started();
     367             283 :         op.impl_ptr = shared_from_this();
     368                 : 
     369                 :         // Set registering BEFORE register_fd to close the race window where
     370                 :         // reactor sees an event before we set registered.
     371             283 :         op.registered.store(
     372                 :             select_registration_state::registering, std::memory_order_release);
     373             283 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     374                 : 
     375                 :         // Transition to registered. If this fails, reactor or cancel already
     376                 :         // claimed the op (state is now unregistered), so we're done. However,
     377                 :         // we must still deregister the fd because cancel's deregister_fd may
     378                 :         // have run before our register_fd, leaving the fd orphaned.
     379             283 :         auto expected = select_registration_state::registering;
     380             283 :         if (!op.registered.compare_exchange_strong(
     381                 :                 expected, select_registration_state::registered,
     382                 :                 std::memory_order_acq_rel))
     383                 :         {
     384 MIS           0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     385               0 :             return std::noop_coroutine();
     386                 :         }
     387                 : 
     388                 :         // If cancelled was set before we registered, handle it now.
     389 HIT         283 :         if (op.cancelled.load(std::memory_order_acquire))
     390                 :         {
     391 MIS           0 :             auto prev = op.registered.exchange(
     392                 :                 select_registration_state::unregistered,
     393                 :                 std::memory_order_acq_rel);
     394               0 :             if (prev != select_registration_state::unregistered)
     395                 :             {
     396               0 :                 svc_.scheduler().deregister_fd(
     397                 :                     fd_, select_scheduler::event_read);
     398               0 :                 op.impl_ptr = shared_from_this();
     399               0 :                 svc_.post(&op);
     400               0 :                 svc_.work_finished();
     401                 :             }
     402                 :         }
     403 HIT         283 :         return std::noop_coroutine();
     404                 :     }
     405                 : 
     406 MIS           0 :     op.complete(errno, 0);
     407               0 :     op.impl_ptr = shared_from_this();
     408               0 :     svc_.post(&op);
     409               0 :     return std::noop_coroutine();
     410                 : }
     411                 : 
     412                 : inline std::coroutine_handle<>
     413 HIT      114756 : select_socket::write_some(
     414                 :     std::coroutine_handle<> h,
     415                 :     capy::executor_ref ex,
     416                 :     io_buffer_param param,
     417                 :     std::stop_token token,
     418                 :     std::error_code* ec,
     419                 :     std::size_t* bytes_out)
     420                 : {
     421          114756 :     auto& op = wr_;
     422          114756 :     op.reset();
     423          114756 :     op.h         = h;
     424          114756 :     op.ex        = ex;
     425          114756 :     op.ec_out    = ec;
     426          114756 :     op.bytes_out = bytes_out;
     427          114756 :     op.fd        = fd_;
     428          114756 :     op.start(token, this);
     429                 : 
     430          114756 :     capy::mutable_buffer bufs[select_write_op::max_buffers];
     431          114756 :     op.iovec_count =
     432          114756 :         static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
     433                 : 
     434          114756 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     435                 :     {
     436               1 :         op.complete(0, 0);
     437               1 :         op.impl_ptr = shared_from_this();
     438               1 :         svc_.post(&op);
     439               1 :         return std::noop_coroutine();
     440                 :     }
     441                 : 
     442          229510 :     for (int i = 0; i < op.iovec_count; ++i)
     443                 :     {
     444          114755 :         op.iovecs[i].iov_base = bufs[i].data();
     445          114755 :         op.iovecs[i].iov_len  = bufs[i].size();
     446                 :     }
     447                 : 
     448          114755 :     msghdr msg{};
     449          114755 :     msg.msg_iov    = op.iovecs;
     450          114755 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     451                 : 
     452          114755 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     453                 : 
     454          114755 :     if (n > 0)
     455                 :     {
     456          114754 :         op.complete(0, static_cast<std::size_t>(n));
     457          114754 :         op.impl_ptr = shared_from_this();
     458          114754 :         svc_.post(&op);
     459          114754 :         return std::noop_coroutine();
     460                 :     }
     461                 : 
     462               1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     463                 :     {
     464 MIS           0 :         svc_.work_started();
     465               0 :         op.impl_ptr = shared_from_this();
     466                 : 
     467                 :         // Set registering BEFORE register_fd to close the race window where
     468                 :         // reactor sees an event before we set registered.
     469               0 :         op.registered.store(
     470                 :             select_registration_state::registering, std::memory_order_release);
     471               0 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     472                 : 
     473                 :         // Transition to registered. If this fails, reactor or cancel already
     474                 :         // claimed the op (state is now unregistered), so we're done. However,
     475                 :         // we must still deregister the fd because cancel's deregister_fd may
     476                 :         // have run before our register_fd, leaving the fd orphaned.
     477               0 :         auto expected = select_registration_state::registering;
     478               0 :         if (!op.registered.compare_exchange_strong(
     479                 :                 expected, select_registration_state::registered,
     480                 :                 std::memory_order_acq_rel))
     481                 :         {
     482               0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     483               0 :             return std::noop_coroutine();
     484                 :         }
     485                 : 
     486                 :         // If cancelled was set before we registered, handle it now.
     487               0 :         if (op.cancelled.load(std::memory_order_acquire))
     488                 :         {
     489               0 :             auto prev = op.registered.exchange(
     490                 :                 select_registration_state::unregistered,
     491                 :                 std::memory_order_acq_rel);
     492               0 :             if (prev != select_registration_state::unregistered)
     493                 :             {
     494               0 :                 svc_.scheduler().deregister_fd(
     495                 :                     fd_, select_scheduler::event_write);
     496               0 :                 op.impl_ptr = shared_from_this();
     497               0 :                 svc_.post(&op);
     498               0 :                 svc_.work_finished();
     499                 :             }
     500                 :         }
     501               0 :         return std::noop_coroutine();
     502                 :     }
     503                 : 
     504 HIT           1 :     op.complete(errno ? errno : EIO, 0);
     505               1 :     op.impl_ptr = shared_from_this();
     506               1 :     svc_.post(&op);
     507               1 :     return std::noop_coroutine();
     508                 : }
     509                 : 
     510                 : inline std::error_code
     511               3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
     512                 : {
     513                 :     int how;
     514               3 :     switch (what)
     515                 :     {
     516               1 :     case tcp_socket::shutdown_receive:
     517               1 :         how = SHUT_RD;
     518               1 :         break;
     519               1 :     case tcp_socket::shutdown_send:
     520               1 :         how = SHUT_WR;
     521               1 :         break;
     522               1 :     case tcp_socket::shutdown_both:
     523               1 :         how = SHUT_RDWR;
     524               1 :         break;
     525 MIS           0 :     default:
     526               0 :         return make_err(EINVAL);
     527                 :     }
     528 HIT           3 :     if (::shutdown(fd_, how) != 0)
     529 MIS           0 :         return make_err(errno);
     530 HIT           3 :     return {};
     531                 : }
     532                 : 
     533                 : inline std::error_code
     534              28 : select_socket::set_option(
     535                 :     int level, int optname,
     536                 :     void const* data, std::size_t size) noexcept
     537                 : {
     538              28 :     if (::setsockopt(fd_, level, optname, data,
     539              28 :             static_cast<socklen_t>(size)) != 0)
     540 MIS           0 :         return make_err(errno);
     541 HIT          28 :     return {};
     542                 : }
     543                 : 
     544                 : inline std::error_code
     545              31 : select_socket::get_option(
     546                 :     int level, int optname,
     547                 :     void* data, std::size_t* size) const noexcept
     548                 : {
     549              31 :     socklen_t len = static_cast<socklen_t>(*size);
     550              31 :     if (::getsockopt(fd_, level, optname, data, &len) != 0)
     551 MIS           0 :         return make_err(errno);
     552 HIT          31 :     *size = static_cast<std::size_t>(len);
     553              31 :     return {};
     554                 : }
     555                 : 
     556                 : inline void
     557             177 : select_socket::cancel() noexcept
     558                 : {
     559             177 :     auto self = weak_from_this().lock();
     560             177 :     if (!self)
     561 MIS           0 :         return;
     562                 : 
     563 HIT         531 :     auto cancel_op = [this, &self](select_op& op, int events) {
     564             531 :         auto prev = op.registered.exchange(
     565                 :             select_registration_state::unregistered, std::memory_order_acq_rel);
     566             531 :         op.request_cancel();
     567             531 :         if (prev != select_registration_state::unregistered)
     568                 :         {
     569              92 :             svc_.scheduler().deregister_fd(fd_, events);
     570              92 :             op.impl_ptr = self;
     571              92 :             svc_.post(&op);
     572              92 :             svc_.work_finished();
     573                 :         }
     574             708 :     };
     575                 : 
     576             177 :     cancel_op(conn_, select_scheduler::event_write);
     577             177 :     cancel_op(rd_, select_scheduler::event_read);
     578             177 :     cancel_op(wr_, select_scheduler::event_write);
     579             177 : }
     580                 : 
     581                 : inline void
     582              99 : select_socket::cancel_single_op(select_op& op) noexcept
     583                 : {
     584              99 :     auto self = weak_from_this().lock();
     585              99 :     if (!self)
     586 MIS           0 :         return;
     587                 : 
     588                 :     // Called from stop_token callback to cancel a specific pending operation.
     589 HIT          99 :     auto prev = op.registered.exchange(
     590                 :         select_registration_state::unregistered, std::memory_order_acq_rel);
     591              99 :     op.request_cancel();
     592                 : 
     593              99 :     if (prev != select_registration_state::unregistered)
     594                 :     {
     595                 :         // Determine which event type to deregister
     596              67 :         int events = 0;
     597              67 :         if (&op == &conn_ || &op == &wr_)
     598 MIS           0 :             events = select_scheduler::event_write;
     599 HIT          67 :         else if (&op == &rd_)
     600              67 :             events = select_scheduler::event_read;
     601                 : 
     602              67 :         svc_.scheduler().deregister_fd(fd_, events);
     603                 : 
     604              67 :         op.impl_ptr = self;
     605              67 :         svc_.post(&op);
     606              67 :         svc_.work_finished();
     607                 :     }
     608              99 : }
     609                 : 
     610                 : inline void
     611           27729 : select_socket::close_socket() noexcept
     612                 : {
     613           27729 :     auto self = weak_from_this().lock();
     614           27729 :     if (self)
     615                 :     {
     616           83187 :         auto cancel_op = [this, &self](select_op& op, int events) {
     617           83187 :             auto prev = op.registered.exchange(
     618                 :                 select_registration_state::unregistered,
     619                 :                 std::memory_order_acq_rel);
     620           83187 :             op.request_cancel();
     621           83187 :             if (prev != select_registration_state::unregistered)
     622                 :             {
     623               1 :                 svc_.scheduler().deregister_fd(fd_, events);
     624               1 :                 op.impl_ptr = self;
     625               1 :                 svc_.post(&op);
     626               1 :                 svc_.work_finished();
     627                 :             }
     628          110916 :         };
     629                 : 
     630           27729 :         cancel_op(conn_, select_scheduler::event_write);
     631           27729 :         cancel_op(rd_, select_scheduler::event_read);
     632           27729 :         cancel_op(wr_, select_scheduler::event_write);
     633                 :     }
     634                 : 
     635           27729 :     if (fd_ >= 0)
     636                 :     {
     637            6160 :         svc_.scheduler().deregister_fd(
     638                 :             fd_, select_scheduler::event_read | select_scheduler::event_write);
     639            6160 :         ::close(fd_);
     640            6160 :         fd_ = -1;
     641                 :     }
     642                 : 
     643           27729 :     local_endpoint_  = endpoint{};
     644           27729 :     remote_endpoint_ = endpoint{};
     645           27729 : }
     646                 : 
     647             154 : inline select_socket_service::select_socket_service(
     648             154 :     capy::execution_context& ctx)
     649             154 :     : state_(
     650                 :           std::make_unique<select_socket_state>(
     651             154 :               ctx.use_service<select_scheduler>()))
     652                 : {
     653             154 : }
     654                 : 
     655             308 : inline select_socket_service::~select_socket_service() {}
     656                 : 
     657                 : inline void
     658             154 : select_socket_service::shutdown()
     659                 : {
     660             154 :     std::lock_guard lock(state_->mutex_);
     661                 : 
     662             154 :     while (auto* impl = state_->socket_list_.pop_front())
     663 MIS           0 :         impl->close_socket();
     664                 : 
     665                 :     // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
     666                 :     // drains completed_ops_, calling destroy() on each queued op. Letting
     667                 :     // ~state_ release the ptrs (during service destruction, after scheduler
     668                 :     // shutdown) keeps every impl alive until all ops have been drained.
     669 HIT         154 : }
     670                 : 
     671                 : inline io_object::implementation*
     672            9240 : select_socket_service::construct()
     673                 : {
     674            9240 :     auto impl = std::make_shared<select_socket>(*this);
     675            9240 :     auto* raw = impl.get();
     676                 : 
     677                 :     {
     678            9240 :         std::lock_guard lock(state_->mutex_);
     679            9240 :         state_->socket_list_.push_back(raw);
     680            9240 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     681            9240 :     }
     682                 : 
     683            9240 :     return raw;
     684            9240 : }
     685                 : 
     686                 : inline void
     687            9240 : select_socket_service::destroy(io_object::implementation* impl)
     688                 : {
     689            9240 :     auto* select_impl = static_cast<select_socket*>(impl);
     690            9240 :     select_impl->close_socket();
     691            9240 :     std::lock_guard lock(state_->mutex_);
     692            9240 :     state_->socket_list_.remove(select_impl);
     693            9240 :     state_->socket_ptrs_.erase(select_impl);
     694            9240 : }
     695                 : 
     696                 : inline std::error_code
     697            3089 : select_socket_service::open_socket(
     698                 :     tcp_socket::implementation& impl,
     699                 :     int family, int type, int protocol)
     700                 : {
     701            3089 :     auto* select_impl = static_cast<select_socket*>(&impl);
     702            3089 :     select_impl->close_socket();
     703                 : 
     704            3089 :     int fd = ::socket(family, type, protocol);
     705            3089 :     if (fd < 0)
     706 MIS           0 :         return make_err(errno);
     707                 : 
     708 HIT        3089 :     if (family == AF_INET6)
     709                 :     {
     710               5 :         int one = 1;
     711               5 :         ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
     712                 :     }
     713                 : 
     714                 :     // Set non-blocking and close-on-exec
     715            3089 :     int flags = ::fcntl(fd, F_GETFL, 0);
     716            3089 :     if (flags == -1)
     717                 :     {
     718 MIS           0 :         int errn = errno;
     719               0 :         ::close(fd);
     720               0 :         return make_err(errn);
     721                 :     }
     722 HIT        3089 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     723                 :     {
     724 MIS           0 :         int errn = errno;
     725               0 :         ::close(fd);
     726               0 :         return make_err(errn);
     727                 :     }
     728 HIT        3089 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     729                 :     {
     730 MIS           0 :         int errn = errno;
     731               0 :         ::close(fd);
     732               0 :         return make_err(errn);
     733                 :     }
     734                 : 
     735                 :     // Check fd is within select() limits
     736 HIT        3089 :     if (fd >= FD_SETSIZE)
     737                 :     {
     738 MIS           0 :         ::close(fd);
     739               0 :         return make_err(EMFILE); // Too many open files
     740                 :     }
     741                 : 
     742 HIT        3089 :     select_impl->fd_ = fd;
     743            3089 :     return {};
     744                 : }
     745                 : 
     746                 : inline void
     747           15400 : select_socket_service::close(io_object::handle& h)
     748                 : {
     749           15400 :     static_cast<select_socket*>(h.get())->close_socket();
     750           15400 : }
     751                 : 
     752                 : inline void
     753          229553 : select_socket_service::post(select_op* op)
     754                 : {
     755          229553 :     state_->sched_.post(op);
     756          229553 : }
     757                 : 
     758                 : inline void
     759            3357 : select_socket_service::work_started() noexcept
     760                 : {
     761            3357 :     state_->sched_.work_started();
     762            3357 : }
     763                 : 
     764                 : inline void
     765             160 : select_socket_service::work_finished() noexcept
     766                 : {
     767             160 :     state_->sched_.work_finished();
     768             160 : }
     769                 : 
     770                 : } // namespace boost::corosio::detail
     771                 : 
     772                 : #endif // BOOST_COROSIO_HAS_SELECT
     773                 : 
     774                 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
        

Generated by: LCOV version 2.3