1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_SELECT
15  
#if BOOST_COROSIO_HAS_SELECT
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
21  
#include <boost/corosio/native/detail/select/select_socket.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
22  
#include <boost/corosio/native/detail/select/select_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
25  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/make_err.hpp>
27  

27  

28  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/corosio/detail/except.hpp>
29  

29  

30  
#include <boost/capy/buffers.hpp>
30  
#include <boost/capy/buffers.hpp>
31  

31  

32  
#include <errno.h>
32  
#include <errno.h>
33  
#include <fcntl.h>
33  
#include <fcntl.h>
34  
#include <netinet/in.h>
34  
#include <netinet/in.h>
35  
#include <netinet/tcp.h>
35  
#include <netinet/tcp.h>
36  
#include <sys/socket.h>
36  
#include <sys/socket.h>
37  
#include <unistd.h>
37  
#include <unistd.h>
38  

38  

39  
#include <memory>
39  
#include <memory>
40  
#include <mutex>
40  
#include <mutex>
41  
#include <unordered_map>
41  
#include <unordered_map>
42  

42  

43  
/*
43  
/*
44  
    select Socket Implementation
44  
    select Socket Implementation
45  
    ============================
45  
    ============================
46  

46  

47  
    This mirrors the epoll_sockets design for behavioral consistency.
47  
    This mirrors the epoll_sockets design for behavioral consistency.
48  
    Each I/O operation follows the same pattern:
48  
    Each I/O operation follows the same pattern:
49  
      1. Try the syscall immediately (non-blocking socket)
49  
      1. Try the syscall immediately (non-blocking socket)
50  
      2. If it succeeds or fails with a real error, post to completion queue
50  
      2. If it succeeds or fails with a real error, post to completion queue
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
51  
      3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52  

52  

53  
    Cancellation
53  
    Cancellation
54  
    ------------
54  
    ------------
55  
    See op.hpp for the completion/cancellation race handling via the
55  
    See op.hpp for the completion/cancellation race handling via the
56  
    `registered` atomic. cancel() must complete pending operations (post
56  
    `registered` atomic. cancel() must complete pending operations (post
57  
    them with cancelled flag) so coroutines waiting on them can resume.
57  
    them with cancelled flag) so coroutines waiting on them can resume.
58  
    close_socket() calls cancel() first to ensure this.
58  
    close_socket() calls cancel() first to ensure this.
59  

59  

60  
    Impl Lifetime with shared_ptr
60  
    Impl Lifetime with shared_ptr
61  
    -----------------------------
61  
    -----------------------------
62  
    Socket impls use enable_shared_from_this. The service owns impls via
62  
    Socket impls use enable_shared_from_this. The service owns impls via
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
63  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64  
    removal. When a user calls close(), we call cancel() which posts pending
64  
    removal. When a user calls close(), we call cancel() which posts pending
65  
    ops to the scheduler.
65  
    ops to the scheduler.
66  

66  

67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
67  
    CRITICAL: The posted ops must keep the impl alive until they complete.
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
68  
    Otherwise the scheduler would process a freed op (use-after-free). The
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
69  
    cancel() method captures shared_from_this() into op.impl_ptr before
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
70  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
71  
    to be destroyed if no other references exist.
71  
    to be destroyed if no other references exist.
72  

72  

73  
    Service Ownership
73  
    Service Ownership
74  
    -----------------
74  
    -----------------
75  
    select_socket_service owns all socket impls. destroy() removes the
75  
    select_socket_service owns all socket impls. destroy() removes the
76  
    shared_ptr from the map, but the impl may survive if ops still hold
76  
    shared_ptr from the map, but the impl may survive if ops still hold
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
77  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
78  
    in-flight ops will complete and release their refs.
78  
    in-flight ops will complete and release their refs.
79  
*/
79  
*/
80  

80  

81  
namespace boost::corosio::detail {
81  
namespace boost::corosio::detail {
82  

82  

83  
/** State for select socket service. */
83  
/** State for select socket service. */
84  
class select_socket_state
84  
class select_socket_state
85  
{
85  
{
86  
public:
86  
public:
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
87  
    explicit select_socket_state(select_scheduler& sched) noexcept
88  
        : sched_(sched)
88  
        : sched_(sched)
89  
    {
89  
    {
90  
    }
90  
    }
91  

91  

92  
    select_scheduler& sched_;
92  
    select_scheduler& sched_;
93  
    std::mutex mutex_;
93  
    std::mutex mutex_;
94  
    intrusive_list<select_socket> socket_list_;
94  
    intrusive_list<select_socket> socket_list_;
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
95  
    std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96  
        socket_ptrs_;
96  
        socket_ptrs_;
97  
};
97  
};
98  

98  

99  
/** select socket service implementation.
99  
/** select socket service implementation.
100  

100  

101  
    Inherits from socket_service to enable runtime polymorphism.
101  
    Inherits from socket_service to enable runtime polymorphism.
102  
    Uses key_type = socket_service for service lookup.
102  
    Uses key_type = socket_service for service lookup.
103  
*/
103  
*/
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
104  
class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105  
{
105  
{
106  
public:
106  
public:
107  
    explicit select_socket_service(capy::execution_context& ctx);
107  
    explicit select_socket_service(capy::execution_context& ctx);
108  
    ~select_socket_service() override;
108  
    ~select_socket_service() override;
109  

109  

110  
    select_socket_service(select_socket_service const&)            = delete;
110  
    select_socket_service(select_socket_service const&)            = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
111  
    select_socket_service& operator=(select_socket_service const&) = delete;
112  

112  

113  
    void shutdown() override;
113  
    void shutdown() override;
114  

114  

115  
    io_object::implementation* construct() override;
115  
    io_object::implementation* construct() override;
116  
    void destroy(io_object::implementation*) override;
116  
    void destroy(io_object::implementation*) override;
117  
    void close(io_object::handle&) override;
117  
    void close(io_object::handle&) override;
118 -
    std::error_code open_socket(tcp_socket::implementation& impl) override;
118 +
    std::error_code
 
119 +
    open_socket(tcp_socket::implementation& impl,
 
120 +
                int family, int type, int protocol) override;
119  

121  

120  
    select_scheduler& scheduler() const noexcept
122  
    select_scheduler& scheduler() const noexcept
121  
    {
123  
    {
122  
        return state_->sched_;
124  
        return state_->sched_;
123  
    }
125  
    }
124  
    void post(select_op* op);
126  
    void post(select_op* op);
125  
    void work_started() noexcept;
127  
    void work_started() noexcept;
126  
    void work_finished() noexcept;
128  
    void work_finished() noexcept;
127  

129  

128  
private:
130  
private:
129  
    std::unique_ptr<select_socket_state> state_;
131  
    std::unique_ptr<select_socket_state> state_;
130  
};
132  
};
131  

133  

132  
// Backward compatibility alias
134  
// Backward compatibility alias
133  
using select_sockets = select_socket_service;
135  
using select_sockets = select_socket_service;
134  

136  

135  
inline void
137  
inline void
136  
select_op::canceller::operator()() const noexcept
138  
select_op::canceller::operator()() const noexcept
137  
{
139  
{
138  
    op->cancel();
140  
    op->cancel();
139  
}
141  
}
140  

142  

141  
inline void
143  
inline void
142  
select_connect_op::cancel() noexcept
144  
select_connect_op::cancel() noexcept
143  
{
145  
{
144  
    if (socket_impl_)
146  
    if (socket_impl_)
145  
        socket_impl_->cancel_single_op(*this);
147  
        socket_impl_->cancel_single_op(*this);
146  
    else
148  
    else
147  
        request_cancel();
149  
        request_cancel();
148  
}
150  
}
149  

151  

150  
inline void
152  
inline void
151  
select_read_op::cancel() noexcept
153  
select_read_op::cancel() noexcept
152  
{
154  
{
153  
    if (socket_impl_)
155  
    if (socket_impl_)
154  
        socket_impl_->cancel_single_op(*this);
156  
        socket_impl_->cancel_single_op(*this);
155  
    else
157  
    else
156  
        request_cancel();
158  
        request_cancel();
157  
}
159  
}
158  

160  

159  
inline void
161  
inline void
160  
select_write_op::cancel() noexcept
162  
select_write_op::cancel() noexcept
161  
{
163  
{
162  
    if (socket_impl_)
164  
    if (socket_impl_)
163  
        socket_impl_->cancel_single_op(*this);
165  
        socket_impl_->cancel_single_op(*this);
164  
    else
166  
    else
165  
        request_cancel();
167  
        request_cancel();
166  
}
168  
}
167  

169  

168  
inline void
170  
inline void
169  
select_connect_op::operator()()
171  
select_connect_op::operator()()
170  
{
172  
{
171  
    stop_cb.reset();
173  
    stop_cb.reset();
172  

174  

173  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
175  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
174  

176  

175  
    // Cache endpoints on successful connect
177  
    // Cache endpoints on successful connect
176  
    if (success && socket_impl_)
178  
    if (success && socket_impl_)
177 -
        // Query local endpoint via getsockname (may fail, but remote is always known)
 
178  
    {
179  
    {
179  
        endpoint local_ep;
180  
        endpoint local_ep;
180 -
        sockaddr_in local_addr{};
181 +
        sockaddr_storage local_storage{};
181 -
        socklen_t local_len = sizeof(local_addr);
182 +
        socklen_t local_len = sizeof(local_storage);
182  
        if (::getsockname(
183  
        if (::getsockname(
183 -
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
184 +
                fd, reinterpret_cast<sockaddr*>(&local_storage),
184 -
            local_ep = from_sockaddr_in(local_addr);
185 +
                &local_len) == 0)
185 -
        // Always cache remote endpoint; local may be default if getsockname failed
186 +
            local_ep = from_sockaddr(local_storage);
186  
        static_cast<select_socket*>(socket_impl_)
187  
        static_cast<select_socket*>(socket_impl_)
187  
            ->set_endpoints(local_ep, target_endpoint);
188  
            ->set_endpoints(local_ep, target_endpoint);
188  
    }
189  
    }
189  

190  

190  
    if (ec_out)
191  
    if (ec_out)
191  
    {
192  
    {
192  
        if (cancelled.load(std::memory_order_acquire))
193  
        if (cancelled.load(std::memory_order_acquire))
193  
            *ec_out = capy::error::canceled;
194  
            *ec_out = capy::error::canceled;
194  
        else if (errn != 0)
195  
        else if (errn != 0)
195  
            *ec_out = make_err(errn);
196  
            *ec_out = make_err(errn);
196  
        else
197  
        else
197  
            *ec_out = {};
198  
            *ec_out = {};
198  
    }
199  
    }
199  

200  

200  
    if (bytes_out)
201  
    if (bytes_out)
201  
        *bytes_out = bytes_transferred;
202  
        *bytes_out = bytes_transferred;
202  

203  

203  
    // Move to stack before destroying the frame
204  
    // Move to stack before destroying the frame
204  
    capy::executor_ref saved_ex(ex);
205  
    capy::executor_ref saved_ex(ex);
205  
    std::coroutine_handle<> saved_h(h);
206  
    std::coroutine_handle<> saved_h(h);
206  
    impl_ptr.reset();
207  
    impl_ptr.reset();
207  
    dispatch_coro(saved_ex, saved_h).resume();
208  
    dispatch_coro(saved_ex, saved_h).resume();
208  
}
209  
}
209  

210  

210  
inline select_socket::select_socket(select_socket_service& svc) noexcept
211  
inline select_socket::select_socket(select_socket_service& svc) noexcept
211  
    : svc_(svc)
212  
    : svc_(svc)
212  
{
213  
{
213  
}
214  
}
214  

215  

215  
inline std::coroutine_handle<>
216  
inline std::coroutine_handle<>
216  
select_socket::connect(
217  
select_socket::connect(
217  
    std::coroutine_handle<> h,
218  
    std::coroutine_handle<> h,
218  
    capy::executor_ref ex,
219  
    capy::executor_ref ex,
219  
    endpoint ep,
220  
    endpoint ep,
220  
    std::stop_token token,
221  
    std::stop_token token,
221  
    std::error_code* ec)
222  
    std::error_code* ec)
222  
{
223  
{
223  
    auto& op = conn_;
224  
    auto& op = conn_;
224  
    op.reset();
225  
    op.reset();
225  
    op.h               = h;
226  
    op.h               = h;
226  
    op.ex              = ex;
227  
    op.ex              = ex;
227  
    op.ec_out          = ec;
228  
    op.ec_out          = ec;
228  
    op.fd              = fd_;
229  
    op.fd              = fd_;
229  
    op.target_endpoint = ep; // Store target for endpoint caching
230  
    op.target_endpoint = ep; // Store target for endpoint caching
230  
    op.start(token, this);
231  
    op.start(token, this);
231  

232  

232 -
    sockaddr_in addr = detail::to_sockaddr_in(ep);
233 +
    sockaddr_storage storage{};
 
234 +
    socklen_t addrlen =
 
235 +
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
233  
    int result =
236  
    int result =
234 -
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
237 +
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
235  

238  

236  
    if (result == 0)
239  
    if (result == 0)
237  
    {
240  
    {
238 -
        // Sync success - cache endpoints immediately
241 +
        // Sync success — cache endpoints immediately
239 -
        sockaddr_in local_addr{};
242 +
        sockaddr_storage local_storage{};
240 -
        socklen_t local_len = sizeof(local_addr);
243 +
        socklen_t local_len = sizeof(local_storage);
241  
        if (::getsockname(
244  
        if (::getsockname(
242 -
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
245 +
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
243 -
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
246 +
                &local_len) == 0)
 
247 +
            local_endpoint_ = detail::from_sockaddr(local_storage);
244  
        remote_endpoint_ = ep;
248  
        remote_endpoint_ = ep;
245  

249  

246  
        op.complete(0, 0);
250  
        op.complete(0, 0);
247  
        op.impl_ptr = shared_from_this();
251  
        op.impl_ptr = shared_from_this();
248  
        svc_.post(&op);
252  
        svc_.post(&op);
249  
        // completion is always posted to scheduler queue, never inline.
253  
        // completion is always posted to scheduler queue, never inline.
250  
        return std::noop_coroutine();
254  
        return std::noop_coroutine();
251  
    }
255  
    }
252  

256  

253  
    if (errno == EINPROGRESS)
257  
    if (errno == EINPROGRESS)
254  
    {
258  
    {
255  
        svc_.work_started();
259  
        svc_.work_started();
256  
        op.impl_ptr = shared_from_this();
260  
        op.impl_ptr = shared_from_this();
257  

261  

258  
        // Set registering BEFORE register_fd to close the race window where
262  
        // Set registering BEFORE register_fd to close the race window where
259  
        // reactor sees an event before we set registered. The reactor treats
263  
        // reactor sees an event before we set registered. The reactor treats
260  
        // registering the same as registered when claiming the op.
264  
        // registering the same as registered when claiming the op.
261  
        op.registered.store(
265  
        op.registered.store(
262  
            select_registration_state::registering, std::memory_order_release);
266  
            select_registration_state::registering, std::memory_order_release);
263  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
267  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
264  

268  

265  
        // Transition to registered. If this fails, reactor or cancel already
269  
        // Transition to registered. If this fails, reactor or cancel already
266  
        // claimed the op (state is now unregistered), so we're done. However,
270  
        // claimed the op (state is now unregistered), so we're done. However,
267  
        // we must still deregister the fd because cancel's deregister_fd may
271  
        // we must still deregister the fd because cancel's deregister_fd may
268  
        // have run before our register_fd, leaving the fd orphaned.
272  
        // have run before our register_fd, leaving the fd orphaned.
269  
        auto expected = select_registration_state::registering;
273  
        auto expected = select_registration_state::registering;
270  
        if (!op.registered.compare_exchange_strong(
274  
        if (!op.registered.compare_exchange_strong(
271  
                expected, select_registration_state::registered,
275  
                expected, select_registration_state::registered,
272  
                std::memory_order_acq_rel))
276  
                std::memory_order_acq_rel))
273  
        {
277  
        {
274  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
278  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
275  
            // completion is always posted to scheduler queue, never inline.
279  
            // completion is always posted to scheduler queue, never inline.
276  
            return std::noop_coroutine();
280  
            return std::noop_coroutine();
277  
        }
281  
        }
278  

282  

279  
        // If cancelled was set before we registered, handle it now.
283  
        // If cancelled was set before we registered, handle it now.
280  
        if (op.cancelled.load(std::memory_order_acquire))
284  
        if (op.cancelled.load(std::memory_order_acquire))
281  
        {
285  
        {
282  
            auto prev = op.registered.exchange(
286  
            auto prev = op.registered.exchange(
283  
                select_registration_state::unregistered,
287  
                select_registration_state::unregistered,
284  
                std::memory_order_acq_rel);
288  
                std::memory_order_acq_rel);
285  
            if (prev != select_registration_state::unregistered)
289  
            if (prev != select_registration_state::unregistered)
286  
            {
290  
            {
287  
                svc_.scheduler().deregister_fd(
291  
                svc_.scheduler().deregister_fd(
288  
                    fd_, select_scheduler::event_write);
292  
                    fd_, select_scheduler::event_write);
289  
                op.impl_ptr = shared_from_this();
293  
                op.impl_ptr = shared_from_this();
290  
                svc_.post(&op);
294  
                svc_.post(&op);
291  
                svc_.work_finished();
295  
                svc_.work_finished();
292  
            }
296  
            }
293  
        }
297  
        }
294  
        // completion is always posted to scheduler queue, never inline.
298  
        // completion is always posted to scheduler queue, never inline.
295  
        return std::noop_coroutine();
299  
        return std::noop_coroutine();
296  
    }
300  
    }
297  

301  

298  
    op.complete(errno, 0);
302  
    op.complete(errno, 0);
299  
    op.impl_ptr = shared_from_this();
303  
    op.impl_ptr = shared_from_this();
300  
    svc_.post(&op);
304  
    svc_.post(&op);
301  
    // completion is always posted to scheduler queue, never inline.
305  
    // completion is always posted to scheduler queue, never inline.
302  
    return std::noop_coroutine();
306  
    return std::noop_coroutine();
303  
}
307  
}
304  

308  

305  
inline std::coroutine_handle<>
309  
inline std::coroutine_handle<>
306  
select_socket::read_some(
310  
select_socket::read_some(
307  
    std::coroutine_handle<> h,
311  
    std::coroutine_handle<> h,
308  
    capy::executor_ref ex,
312  
    capy::executor_ref ex,
309  
    io_buffer_param param,
313  
    io_buffer_param param,
310  
    std::stop_token token,
314  
    std::stop_token token,
311  
    std::error_code* ec,
315  
    std::error_code* ec,
312  
    std::size_t* bytes_out)
316  
    std::size_t* bytes_out)
313  
{
317  
{
314  
    auto& op = rd_;
318  
    auto& op = rd_;
315  
    op.reset();
319  
    op.reset();
316  
    op.h         = h;
320  
    op.h         = h;
317  
    op.ex        = ex;
321  
    op.ex        = ex;
318  
    op.ec_out    = ec;
322  
    op.ec_out    = ec;
319  
    op.bytes_out = bytes_out;
323  
    op.bytes_out = bytes_out;
320  
    op.fd        = fd_;
324  
    op.fd        = fd_;
321  
    op.start(token, this);
325  
    op.start(token, this);
322  

326  

323  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
327  
    capy::mutable_buffer bufs[select_read_op::max_buffers];
324  
    op.iovec_count =
328  
    op.iovec_count =
325  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
329  
        static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
326  

330  

327  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
331  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328  
    {
332  
    {
329  
        op.empty_buffer_read = true;
333  
        op.empty_buffer_read = true;
330  
        op.complete(0, 0);
334  
        op.complete(0, 0);
331  
        op.impl_ptr = shared_from_this();
335  
        op.impl_ptr = shared_from_this();
332  
        svc_.post(&op);
336  
        svc_.post(&op);
333  
        return std::noop_coroutine();
337  
        return std::noop_coroutine();
334  
    }
338  
    }
335  

339  

336  
    for (int i = 0; i < op.iovec_count; ++i)
340  
    for (int i = 0; i < op.iovec_count; ++i)
337  
    {
341  
    {
338  
        op.iovecs[i].iov_base = bufs[i].data();
342  
        op.iovecs[i].iov_base = bufs[i].data();
339  
        op.iovecs[i].iov_len  = bufs[i].size();
343  
        op.iovecs[i].iov_len  = bufs[i].size();
340  
    }
344  
    }
341  

345  

342  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
346  
    ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
343  

347  

344  
    if (n > 0)
348  
    if (n > 0)
345  
    {
349  
    {
346  
        op.complete(0, static_cast<std::size_t>(n));
350  
        op.complete(0, static_cast<std::size_t>(n));
347  
        op.impl_ptr = shared_from_this();
351  
        op.impl_ptr = shared_from_this();
348  
        svc_.post(&op);
352  
        svc_.post(&op);
349  
        return std::noop_coroutine();
353  
        return std::noop_coroutine();
350  
    }
354  
    }
351  

355  

352  
    if (n == 0)
356  
    if (n == 0)
353  
    {
357  
    {
354  
        op.complete(0, 0);
358  
        op.complete(0, 0);
355  
        op.impl_ptr = shared_from_this();
359  
        op.impl_ptr = shared_from_this();
356  
        svc_.post(&op);
360  
        svc_.post(&op);
357  
        return std::noop_coroutine();
361  
        return std::noop_coroutine();
358  
    }
362  
    }
359  

363  

360  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
364  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
361  
    {
365  
    {
362  
        svc_.work_started();
366  
        svc_.work_started();
363  
        op.impl_ptr = shared_from_this();
367  
        op.impl_ptr = shared_from_this();
364  

368  

365  
        // Set registering BEFORE register_fd to close the race window where
369  
        // Set registering BEFORE register_fd to close the race window where
366  
        // reactor sees an event before we set registered.
370  
        // reactor sees an event before we set registered.
367  
        op.registered.store(
371  
        op.registered.store(
368  
            select_registration_state::registering, std::memory_order_release);
372  
            select_registration_state::registering, std::memory_order_release);
369  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
373  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
370  

374  

371  
        // Transition to registered. If this fails, reactor or cancel already
375  
        // Transition to registered. If this fails, reactor or cancel already
372  
        // claimed the op (state is now unregistered), so we're done. However,
376  
        // claimed the op (state is now unregistered), so we're done. However,
373  
        // we must still deregister the fd because cancel's deregister_fd may
377  
        // we must still deregister the fd because cancel's deregister_fd may
374  
        // have run before our register_fd, leaving the fd orphaned.
378  
        // have run before our register_fd, leaving the fd orphaned.
375  
        auto expected = select_registration_state::registering;
379  
        auto expected = select_registration_state::registering;
376  
        if (!op.registered.compare_exchange_strong(
380  
        if (!op.registered.compare_exchange_strong(
377  
                expected, select_registration_state::registered,
381  
                expected, select_registration_state::registered,
378  
                std::memory_order_acq_rel))
382  
                std::memory_order_acq_rel))
379  
        {
383  
        {
380  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
384  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
381  
            return std::noop_coroutine();
385  
            return std::noop_coroutine();
382  
        }
386  
        }
383  

387  

384  
        // If cancelled was set before we registered, handle it now.
388  
        // If cancelled was set before we registered, handle it now.
385  
        if (op.cancelled.load(std::memory_order_acquire))
389  
        if (op.cancelled.load(std::memory_order_acquire))
386  
        {
390  
        {
387  
            auto prev = op.registered.exchange(
391  
            auto prev = op.registered.exchange(
388  
                select_registration_state::unregistered,
392  
                select_registration_state::unregistered,
389  
                std::memory_order_acq_rel);
393  
                std::memory_order_acq_rel);
390  
            if (prev != select_registration_state::unregistered)
394  
            if (prev != select_registration_state::unregistered)
391  
            {
395  
            {
392  
                svc_.scheduler().deregister_fd(
396  
                svc_.scheduler().deregister_fd(
393  
                    fd_, select_scheduler::event_read);
397  
                    fd_, select_scheduler::event_read);
394  
                op.impl_ptr = shared_from_this();
398  
                op.impl_ptr = shared_from_this();
395  
                svc_.post(&op);
399  
                svc_.post(&op);
396  
                svc_.work_finished();
400  
                svc_.work_finished();
397  
            }
401  
            }
398  
        }
402  
        }
399  
        return std::noop_coroutine();
403  
        return std::noop_coroutine();
400  
    }
404  
    }
401  

405  

402  
    op.complete(errno, 0);
406  
    op.complete(errno, 0);
403  
    op.impl_ptr = shared_from_this();
407  
    op.impl_ptr = shared_from_this();
404  
    svc_.post(&op);
408  
    svc_.post(&op);
405  
    return std::noop_coroutine();
409  
    return std::noop_coroutine();
406  
}
410  
}
407  

411  

408  
inline std::coroutine_handle<>
412  
inline std::coroutine_handle<>
409  
select_socket::write_some(
413  
select_socket::write_some(
410  
    std::coroutine_handle<> h,
414  
    std::coroutine_handle<> h,
411  
    capy::executor_ref ex,
415  
    capy::executor_ref ex,
412  
    io_buffer_param param,
416  
    io_buffer_param param,
413  
    std::stop_token token,
417  
    std::stop_token token,
414  
    std::error_code* ec,
418  
    std::error_code* ec,
415  
    std::size_t* bytes_out)
419  
    std::size_t* bytes_out)
416  
{
420  
{
417  
    auto& op = wr_;
421  
    auto& op = wr_;
418  
    op.reset();
422  
    op.reset();
419  
    op.h         = h;
423  
    op.h         = h;
420  
    op.ex        = ex;
424  
    op.ex        = ex;
421  
    op.ec_out    = ec;
425  
    op.ec_out    = ec;
422  
    op.bytes_out = bytes_out;
426  
    op.bytes_out = bytes_out;
423  
    op.fd        = fd_;
427  
    op.fd        = fd_;
424  
    op.start(token, this);
428  
    op.start(token, this);
425  

429  

426  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
430  
    capy::mutable_buffer bufs[select_write_op::max_buffers];
427  
    op.iovec_count =
431  
    op.iovec_count =
428  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
432  
        static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
429  

433  

430  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
434  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
431  
    {
435  
    {
432  
        op.complete(0, 0);
436  
        op.complete(0, 0);
433  
        op.impl_ptr = shared_from_this();
437  
        op.impl_ptr = shared_from_this();
434  
        svc_.post(&op);
438  
        svc_.post(&op);
435  
        return std::noop_coroutine();
439  
        return std::noop_coroutine();
436  
    }
440  
    }
437  

441  

438  
    for (int i = 0; i < op.iovec_count; ++i)
442  
    for (int i = 0; i < op.iovec_count; ++i)
439  
    {
443  
    {
440  
        op.iovecs[i].iov_base = bufs[i].data();
444  
        op.iovecs[i].iov_base = bufs[i].data();
441  
        op.iovecs[i].iov_len  = bufs[i].size();
445  
        op.iovecs[i].iov_len  = bufs[i].size();
442  
    }
446  
    }
443  

447  

444  
    msghdr msg{};
448  
    msghdr msg{};
445  
    msg.msg_iov    = op.iovecs;
449  
    msg.msg_iov    = op.iovecs;
446  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
450  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
447  

451  

448  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
452  
    ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
449  

453  

450  
    if (n > 0)
454  
    if (n > 0)
451  
    {
455  
    {
452  
        op.complete(0, static_cast<std::size_t>(n));
456  
        op.complete(0, static_cast<std::size_t>(n));
453  
        op.impl_ptr = shared_from_this();
457  
        op.impl_ptr = shared_from_this();
454  
        svc_.post(&op);
458  
        svc_.post(&op);
455  
        return std::noop_coroutine();
459  
        return std::noop_coroutine();
456  
    }
460  
    }
457  

461  

458  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
462  
    if (errno == EAGAIN || errno == EWOULDBLOCK)
459  
    {
463  
    {
460  
        svc_.work_started();
464  
        svc_.work_started();
461  
        op.impl_ptr = shared_from_this();
465  
        op.impl_ptr = shared_from_this();
462  

466  

463  
        // Set registering BEFORE register_fd to close the race window where
467  
        // Set registering BEFORE register_fd to close the race window where
464  
        // reactor sees an event before we set registered.
468  
        // reactor sees an event before we set registered.
465  
        op.registered.store(
469  
        op.registered.store(
466  
            select_registration_state::registering, std::memory_order_release);
470  
            select_registration_state::registering, std::memory_order_release);
467  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
471  
        svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
468  

472  

469  
        // Transition to registered. If this fails, reactor or cancel already
473  
        // Transition to registered. If this fails, reactor or cancel already
470  
        // claimed the op (state is now unregistered), so we're done. However,
474  
        // claimed the op (state is now unregistered), so we're done. However,
471  
        // we must still deregister the fd because cancel's deregister_fd may
475  
        // we must still deregister the fd because cancel's deregister_fd may
472  
        // have run before our register_fd, leaving the fd orphaned.
476  
        // have run before our register_fd, leaving the fd orphaned.
473  
        auto expected = select_registration_state::registering;
477  
        auto expected = select_registration_state::registering;
474  
        if (!op.registered.compare_exchange_strong(
478  
        if (!op.registered.compare_exchange_strong(
475  
                expected, select_registration_state::registered,
479  
                expected, select_registration_state::registered,
476  
                std::memory_order_acq_rel))
480  
                std::memory_order_acq_rel))
477  
        {
481  
        {
478  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
482  
            svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
479  
            return std::noop_coroutine();
483  
            return std::noop_coroutine();
480  
        }
484  
        }
481  

485  

482  
        // If cancelled was set before we registered, handle it now.
486  
        // If cancelled was set before we registered, handle it now.
483  
        if (op.cancelled.load(std::memory_order_acquire))
487  
        if (op.cancelled.load(std::memory_order_acquire))
484  
        {
488  
        {
485  
            auto prev = op.registered.exchange(
489  
            auto prev = op.registered.exchange(
486  
                select_registration_state::unregistered,
490  
                select_registration_state::unregistered,
487  
                std::memory_order_acq_rel);
491  
                std::memory_order_acq_rel);
488  
            if (prev != select_registration_state::unregistered)
492  
            if (prev != select_registration_state::unregistered)
489  
            {
493  
            {
490  
                svc_.scheduler().deregister_fd(
494  
                svc_.scheduler().deregister_fd(
491  
                    fd_, select_scheduler::event_write);
495  
                    fd_, select_scheduler::event_write);
492  
                op.impl_ptr = shared_from_this();
496  
                op.impl_ptr = shared_from_this();
493  
                svc_.post(&op);
497  
                svc_.post(&op);
494  
                svc_.work_finished();
498  
                svc_.work_finished();
495  
            }
499  
            }
496  
        }
500  
        }
497  
        return std::noop_coroutine();
501  
        return std::noop_coroutine();
498  
    }
502  
    }
499  

503  

500  
    op.complete(errno ? errno : EIO, 0);
504  
    op.complete(errno ? errno : EIO, 0);
501  
    op.impl_ptr = shared_from_this();
505  
    op.impl_ptr = shared_from_this();
502  
    svc_.post(&op);
506  
    svc_.post(&op);
503  
    return std::noop_coroutine();
507  
    return std::noop_coroutine();
504  
}
508  
}
505  

509  

506  
inline std::error_code
510  
inline std::error_code
507  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
511  
select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
508  
{
512  
{
509  
    int how;
513  
    int how;
510  
    switch (what)
514  
    switch (what)
511  
    {
515  
    {
512  
    case tcp_socket::shutdown_receive:
516  
    case tcp_socket::shutdown_receive:
513  
        how = SHUT_RD;
517  
        how = SHUT_RD;
514  
        break;
518  
        break;
515  
    case tcp_socket::shutdown_send:
519  
    case tcp_socket::shutdown_send:
516  
        how = SHUT_WR;
520  
        how = SHUT_WR;
517  
        break;
521  
        break;
518  
    case tcp_socket::shutdown_both:
522  
    case tcp_socket::shutdown_both:
519  
        how = SHUT_RDWR;
523  
        how = SHUT_RDWR;
520  
        break;
524  
        break;
521  
    default:
525  
    default:
522  
        return make_err(EINVAL);
526  
        return make_err(EINVAL);
523  
    }
527  
    }
524  
    if (::shutdown(fd_, how) != 0)
528  
    if (::shutdown(fd_, how) != 0)
525  
        return make_err(errno);
529  
        return make_err(errno);
526  
    return {};
530  
    return {};
527  
}
531  
}
528  

532  

529  
inline std::error_code
533  
inline std::error_code
530 -
select_socket::set_no_delay(bool value) noexcept
534 +
select_socket::set_option(
531 -
{
535 +
    int level, int optname,
532 -
    int flag = value ? 1 : 0;
536 +
    void const* data, std::size_t size) noexcept
533 -
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
 
534 -
        return make_err(errno);
 
535 -
    return {};
 
536 -
}
 
537 -

 
538 -
inline bool
 
539 -
select_socket::no_delay(std::error_code& ec) const noexcept
 
540 -
{
 
541 -
    int flag      = 0;
 
542 -
    socklen_t len = sizeof(flag);
 
543 -
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
 
544 -
    {
 
545 -
        ec = make_err(errno);
 
546 -
        return false;
 
547 -
    }
 
548 -
    ec = {};
 
549 -
    return flag != 0;
 
550 -
}
 
551 -

 
552 -
inline std::error_code
 
553 -
select_socket::set_keep_alive(bool value) noexcept
 
554 -
{
 
555 -
    int flag = value ? 1 : 0;
 
556 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
 
557 -
        return make_err(errno);
 
558 -
    return {};
 
559 -
}
 
560 -

 
561 -
inline bool
 
562 -
select_socket::keep_alive(std::error_code& ec) const noexcept
 
563 -
{
 
564 -
    int flag      = 0;
 
565 -
    socklen_t len = sizeof(flag);
 
566 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
 
567 -
    {
 
568 -
        ec = make_err(errno);
 
569 -
        return false;
 
570 -
    }
 
571 -
    ec = {};
 
572 -
    return flag != 0;
 
573 -
}
 
574 -

 
575 -
inline std::error_code
 
576 -
select_socket::set_receive_buffer_size(int size) noexcept
 
577 -
{
 
578 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
 
579 -
        return make_err(errno);
 
580 -
    return {};
 
581 -
}
 
582 -

 
583 -
inline int
 
584 -
select_socket::receive_buffer_size(std::error_code& ec) const noexcept
 
585 -
{
 
586 -
    int size      = 0;
 
587 -
    socklen_t len = sizeof(size);
 
588 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
 
589 -
    {
 
590 -
        ec = make_err(errno);
 
591 -
        return 0;
 
592 -
    }
 
593 -
    ec = {};
 
594 -
    return size;
 
595 -
}
 
596 -

 
597 -
inline std::error_code
 
598 -
select_socket::set_send_buffer_size(int size) noexcept
 
599  
{
537  
{
600 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
538 +
    if (::setsockopt(fd_, level, optname, data,
 
539 +
            static_cast<socklen_t>(size)) != 0)
601  
        return make_err(errno);
540  
        return make_err(errno);
602  
    return {};
541  
    return {};
603  
}
542  
}
604 -
inline int
 
605 -
select_socket::send_buffer_size(std::error_code& ec) const noexcept
 
606 -
{
 
607 -
    int size      = 0;
 
608 -
    socklen_t len = sizeof(size);
 
609 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
 
610 -
    {
 
611 -
        ec = make_err(errno);
 
612 -
        return 0;
 
613 -
    }
 
614 -
    ec = {};
 
615 -
    return size;
 
616 -
}
 
617 -

 
618  

543  

619  
inline std::error_code
544  
inline std::error_code
620 -
select_socket::set_linger(bool enabled, int timeout) noexcept
545 +
select_socket::get_option(
 
546 +
    int level, int optname,
 
547 +
    void* data, std::size_t* size) const noexcept
621  
{
548  
{
622 -
    if (timeout < 0)
549 +
    socklen_t len = static_cast<socklen_t>(*size);
623 -
        return make_err(EINVAL);
550 +
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
624 -
    struct ::linger lg;
 
625 -
    lg.l_onoff  = enabled ? 1 : 0;
 
626 -
    lg.l_linger = timeout;
 
627 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
 
628  
        return make_err(errno);
551  
        return make_err(errno);
 
552 +
    *size = static_cast<std::size_t>(len);
629  
    return {};
553  
    return {};
630  
}
554  
}
631 -
inline tcp_socket::linger_options
 
632 -
select_socket::linger(std::error_code& ec) const noexcept
 
633 -
{
 
634 -
    struct ::linger lg{};
 
635 -
    socklen_t len = sizeof(lg);
 
636 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
 
637 -
    {
 
638 -
        ec = make_err(errno);
 
639 -
        return {};
 
640 -
    }
 
641 -
    ec = {};
 
642 -
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
 
643 -
}
 
644 -

 
645  

555  

646  
inline void
556  
inline void
647  
select_socket::cancel() noexcept
557  
select_socket::cancel() noexcept
648  
{
558  
{
649  
    auto self = weak_from_this().lock();
559  
    auto self = weak_from_this().lock();
650  
    if (!self)
560  
    if (!self)
651  
        return;
561  
        return;
652  

562  

653  
    auto cancel_op = [this, &self](select_op& op, int events) {
563  
    auto cancel_op = [this, &self](select_op& op, int events) {
654  
        auto prev = op.registered.exchange(
564  
        auto prev = op.registered.exchange(
655  
            select_registration_state::unregistered, std::memory_order_acq_rel);
565  
            select_registration_state::unregistered, std::memory_order_acq_rel);
656  
        op.request_cancel();
566  
        op.request_cancel();
657  
        if (prev != select_registration_state::unregistered)
567  
        if (prev != select_registration_state::unregistered)
658  
        {
568  
        {
659  
            svc_.scheduler().deregister_fd(fd_, events);
569  
            svc_.scheduler().deregister_fd(fd_, events);
660  
            op.impl_ptr = self;
570  
            op.impl_ptr = self;
661  
            svc_.post(&op);
571  
            svc_.post(&op);
662  
            svc_.work_finished();
572  
            svc_.work_finished();
663  
        }
573  
        }
664  
    };
574  
    };
665  

575  

666  
    cancel_op(conn_, select_scheduler::event_write);
576  
    cancel_op(conn_, select_scheduler::event_write);
667  
    cancel_op(rd_, select_scheduler::event_read);
577  
    cancel_op(rd_, select_scheduler::event_read);
668  
    cancel_op(wr_, select_scheduler::event_write);
578  
    cancel_op(wr_, select_scheduler::event_write);
669  
}
579  
}
670  

580  

671  
inline void
581  
inline void
672  
select_socket::cancel_single_op(select_op& op) noexcept
582  
select_socket::cancel_single_op(select_op& op) noexcept
673  
{
583  
{
674  
    auto self = weak_from_this().lock();
584  
    auto self = weak_from_this().lock();
675  
    if (!self)
585  
    if (!self)
676  
        return;
586  
        return;
677  

587  

678  
    // Called from stop_token callback to cancel a specific pending operation.
588  
    // Called from stop_token callback to cancel a specific pending operation.
679  
    auto prev = op.registered.exchange(
589  
    auto prev = op.registered.exchange(
680  
        select_registration_state::unregistered, std::memory_order_acq_rel);
590  
        select_registration_state::unregistered, std::memory_order_acq_rel);
681  
    op.request_cancel();
591  
    op.request_cancel();
682  

592  

683  
    if (prev != select_registration_state::unregistered)
593  
    if (prev != select_registration_state::unregistered)
684  
    {
594  
    {
685  
        // Determine which event type to deregister
595  
        // Determine which event type to deregister
686  
        int events = 0;
596  
        int events = 0;
687  
        if (&op == &conn_ || &op == &wr_)
597  
        if (&op == &conn_ || &op == &wr_)
688  
            events = select_scheduler::event_write;
598  
            events = select_scheduler::event_write;
689  
        else if (&op == &rd_)
599  
        else if (&op == &rd_)
690  
            events = select_scheduler::event_read;
600  
            events = select_scheduler::event_read;
691  

601  

692  
        svc_.scheduler().deregister_fd(fd_, events);
602  
        svc_.scheduler().deregister_fd(fd_, events);
693  

603  

694  
        op.impl_ptr = self;
604  
        op.impl_ptr = self;
695  
        svc_.post(&op);
605  
        svc_.post(&op);
696  
        svc_.work_finished();
606  
        svc_.work_finished();
697  
    }
607  
    }
698  
}
608  
}
699  

609  

700  
inline void
610  
inline void
701  
select_socket::close_socket() noexcept
611  
select_socket::close_socket() noexcept
702  
{
612  
{
703  
    auto self = weak_from_this().lock();
613  
    auto self = weak_from_this().lock();
704  
    if (self)
614  
    if (self)
705  
    {
615  
    {
706  
        auto cancel_op = [this, &self](select_op& op, int events) {
616  
        auto cancel_op = [this, &self](select_op& op, int events) {
707  
            auto prev = op.registered.exchange(
617  
            auto prev = op.registered.exchange(
708  
                select_registration_state::unregistered,
618  
                select_registration_state::unregistered,
709  
                std::memory_order_acq_rel);
619  
                std::memory_order_acq_rel);
710  
            op.request_cancel();
620  
            op.request_cancel();
711  
            if (prev != select_registration_state::unregistered)
621  
            if (prev != select_registration_state::unregistered)
712  
            {
622  
            {
713  
                svc_.scheduler().deregister_fd(fd_, events);
623  
                svc_.scheduler().deregister_fd(fd_, events);
714  
                op.impl_ptr = self;
624  
                op.impl_ptr = self;
715  
                svc_.post(&op);
625  
                svc_.post(&op);
716  
                svc_.work_finished();
626  
                svc_.work_finished();
717  
            }
627  
            }
718  
        };
628  
        };
719  

629  

720  
        cancel_op(conn_, select_scheduler::event_write);
630  
        cancel_op(conn_, select_scheduler::event_write);
721  
        cancel_op(rd_, select_scheduler::event_read);
631  
        cancel_op(rd_, select_scheduler::event_read);
722  
        cancel_op(wr_, select_scheduler::event_write);
632  
        cancel_op(wr_, select_scheduler::event_write);
723  
    }
633  
    }
724  

634  

725  
    if (fd_ >= 0)
635  
    if (fd_ >= 0)
726  
    {
636  
    {
727  
        svc_.scheduler().deregister_fd(
637  
        svc_.scheduler().deregister_fd(
728  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
638  
            fd_, select_scheduler::event_read | select_scheduler::event_write);
729  
        ::close(fd_);
639  
        ::close(fd_);
730  
        fd_ = -1;
640  
        fd_ = -1;
731  
    }
641  
    }
732  

642  

733  
    local_endpoint_  = endpoint{};
643  
    local_endpoint_  = endpoint{};
734  
    remote_endpoint_ = endpoint{};
644  
    remote_endpoint_ = endpoint{};
735  
}
645  
}
736  

646  

737  
inline select_socket_service::select_socket_service(
647  
inline select_socket_service::select_socket_service(
738  
    capy::execution_context& ctx)
648  
    capy::execution_context& ctx)
739  
    : state_(
649  
    : state_(
740  
          std::make_unique<select_socket_state>(
650  
          std::make_unique<select_socket_state>(
741  
              ctx.use_service<select_scheduler>()))
651  
              ctx.use_service<select_scheduler>()))
742  
{
652  
{
743  
}
653  
}
744  

654  

745  
inline select_socket_service::~select_socket_service() {}
655  
inline select_socket_service::~select_socket_service() {}
746  

656  

747  
inline void
657  
inline void
748  
select_socket_service::shutdown()
658  
select_socket_service::shutdown()
749  
{
659  
{
750  
    std::lock_guard lock(state_->mutex_);
660  
    std::lock_guard lock(state_->mutex_);
751  

661  

752  
    while (auto* impl = state_->socket_list_.pop_front())
662  
    while (auto* impl = state_->socket_list_.pop_front())
753  
        impl->close_socket();
663  
        impl->close_socket();
754  

664  

755  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
756  
    // drains completed_ops_, calling destroy() on each queued op. Letting
666  
    // drains completed_ops_, calling destroy() on each queued op. Letting
757  
    // ~state_ release the ptrs (during service destruction, after scheduler
667  
    // ~state_ release the ptrs (during service destruction, after scheduler
758  
    // shutdown) keeps every impl alive until all ops have been drained.
668  
    // shutdown) keeps every impl alive until all ops have been drained.
759  
}
669  
}
760  

670  

761  
inline io_object::implementation*
671  
inline io_object::implementation*
762  
select_socket_service::construct()
672  
select_socket_service::construct()
763  
{
673  
{
764  
    auto impl = std::make_shared<select_socket>(*this);
674  
    auto impl = std::make_shared<select_socket>(*this);
765  
    auto* raw = impl.get();
675  
    auto* raw = impl.get();
766  

676  

767  
    {
677  
    {
768  
        std::lock_guard lock(state_->mutex_);
678  
        std::lock_guard lock(state_->mutex_);
769  
        state_->socket_list_.push_back(raw);
679  
        state_->socket_list_.push_back(raw);
770  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
680  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
771  
    }
681  
    }
772  

682  

773  
    return raw;
683  
    return raw;
774  
}
684  
}
775  

685  

776  
inline void
686  
inline void
777  
select_socket_service::destroy(io_object::implementation* impl)
687  
select_socket_service::destroy(io_object::implementation* impl)
778  
{
688  
{
779  
    auto* select_impl = static_cast<select_socket*>(impl);
689  
    auto* select_impl = static_cast<select_socket*>(impl);
780  
    select_impl->close_socket();
690  
    select_impl->close_socket();
781  
    std::lock_guard lock(state_->mutex_);
691  
    std::lock_guard lock(state_->mutex_);
782  
    state_->socket_list_.remove(select_impl);
692  
    state_->socket_list_.remove(select_impl);
783  
    state_->socket_ptrs_.erase(select_impl);
693  
    state_->socket_ptrs_.erase(select_impl);
784  
}
694  
}
785  

695  

786  
inline std::error_code
696  
inline std::error_code
787 -
select_socket_service::open_socket(tcp_socket::implementation& impl)
697 +
select_socket_service::open_socket(
 
698 +
    tcp_socket::implementation& impl,
 
699 +
    int family, int type, int protocol)
788  
{
700  
{
789  
    auto* select_impl = static_cast<select_socket*>(&impl);
701  
    auto* select_impl = static_cast<select_socket*>(&impl);
790  
    select_impl->close_socket();
702  
    select_impl->close_socket();
791  

703  

792 -
    int fd = ::socket(AF_INET, SOCK_STREAM, 0);
704 +
    int fd = ::socket(family, type, protocol);
793  
    if (fd < 0)
705  
    if (fd < 0)
794  
        return make_err(errno);
706  
        return make_err(errno);
 
707 +

 
708 +
    if (family == AF_INET6)
 
709 +
    {
 
710 +
        int one = 1;
 
711 +
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
 
712 +
    }
795  

713  

796  
    // Set non-blocking and close-on-exec
714  
    // Set non-blocking and close-on-exec
797  
    int flags = ::fcntl(fd, F_GETFL, 0);
715  
    int flags = ::fcntl(fd, F_GETFL, 0);
798  
    if (flags == -1)
716  
    if (flags == -1)
799  
    {
717  
    {
800  
        int errn = errno;
718  
        int errn = errno;
801  
        ::close(fd);
719  
        ::close(fd);
802  
        return make_err(errn);
720  
        return make_err(errn);
803  
    }
721  
    }
804  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
722  
    if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
805  
    {
723  
    {
806  
        int errn = errno;
724  
        int errn = errno;
807  
        ::close(fd);
725  
        ::close(fd);
808  
        return make_err(errn);
726  
        return make_err(errn);
809  
    }
727  
    }
810  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
728  
    if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
811  
    {
729  
    {
812  
        int errn = errno;
730  
        int errn = errno;
813  
        ::close(fd);
731  
        ::close(fd);
814  
        return make_err(errn);
732  
        return make_err(errn);
815  
    }
733  
    }
816  

734  

817  
    // Check fd is within select() limits
735  
    // Check fd is within select() limits
818  
    if (fd >= FD_SETSIZE)
736  
    if (fd >= FD_SETSIZE)
819  
    {
737  
    {
820  
        ::close(fd);
738  
        ::close(fd);
821  
        return make_err(EMFILE); // Too many open files
739  
        return make_err(EMFILE); // Too many open files
822  
    }
740  
    }
823  

741  

824  
    select_impl->fd_ = fd;
742  
    select_impl->fd_ = fd;
825  
    return {};
743  
    return {};
826  
}
744  
}
827  

745  

828  
inline void
746  
inline void
829  
select_socket_service::close(io_object::handle& h)
747  
select_socket_service::close(io_object::handle& h)
830  
{
748  
{
831  
    static_cast<select_socket*>(h.get())->close_socket();
749  
    static_cast<select_socket*>(h.get())->close_socket();
832  
}
750  
}
833  

751  

834  
inline void
752  
inline void
835  
select_socket_service::post(select_op* op)
753  
select_socket_service::post(select_op* op)
836  
{
754  
{
837  
    state_->sched_.post(op);
755  
    state_->sched_.post(op);
838  
}
756  
}
839  

757  

840  
inline void
758  
inline void
841  
select_socket_service::work_started() noexcept
759  
select_socket_service::work_started() noexcept
842  
{
760  
{
843  
    state_->sched_.work_started();
761  
    state_->sched_.work_started();
844  
}
762  
}
845  

763  

846  
inline void
764  
inline void
847  
select_socket_service::work_finished() noexcept
765  
select_socket_service::work_finished() noexcept
848  
{
766  
{
849  
    state_->sched_.work_finished();
767  
    state_->sched_.work_finished();
850  
}
768  
}
851  

769  

852  
} // namespace boost::corosio::detail
770  
} // namespace boost::corosio::detail
853  

771  

854  
#endif // BOOST_COROSIO_HAS_SELECT
772  
#endif // BOOST_COROSIO_HAS_SELECT
855  

773  

856  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
774  
#endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP