1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
10  
#ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11  
#define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12  

12  

13  
#include <boost/corosio/detail/platform.hpp>
13  
#include <boost/corosio/detail/platform.hpp>
14  

14  

15  
#if BOOST_COROSIO_HAS_EPOLL
15  
#if BOOST_COROSIO_HAS_EPOLL
16  

16  

17  
#include <boost/corosio/detail/config.hpp>
17  
#include <boost/corosio/detail/config.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <boost/capy/ex/execution_context.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
19  
#include <boost/corosio/detail/socket_service.hpp>
20  

20  

21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
21  
#include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
22  
#include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23  

23  

24  
#include <boost/corosio/detail/endpoint_convert.hpp>
24  
#include <boost/corosio/detail/endpoint_convert.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
25  
#include <boost/corosio/detail/make_err.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
26  
#include <boost/corosio/detail/dispatch_coro.hpp>
27  
#include <boost/corosio/detail/except.hpp>
27  
#include <boost/corosio/detail/except.hpp>
28  
#include <boost/capy/buffers.hpp>
28  
#include <boost/capy/buffers.hpp>
29  

29  

30  
#include <coroutine>
30  
#include <coroutine>
31  
#include <mutex>
31  
#include <mutex>
32  
#include <unordered_map>
32  
#include <unordered_map>
33  
#include <utility>
33  
#include <utility>
34  

34  

35  
#include <errno.h>
35  
#include <errno.h>
36  
#include <netinet/in.h>
36  
#include <netinet/in.h>
37  
#include <netinet/tcp.h>
37  
#include <netinet/tcp.h>
38  
#include <sys/epoll.h>
38  
#include <sys/epoll.h>
39  
#include <sys/socket.h>
39  
#include <sys/socket.h>
40  
#include <unistd.h>
40  
#include <unistd.h>
41  

41  

42  
/*
42  
/*
43  
    epoll Socket Implementation
43  
    epoll Socket Implementation
44  
    ===========================
44  
    ===========================
45  

45  

46  
    Each I/O operation follows the same pattern:
46  
    Each I/O operation follows the same pattern:
47  
      1. Try the syscall immediately (non-blocking socket)
47  
      1. Try the syscall immediately (non-blocking socket)
48  
      2. If it succeeds or fails with a real error, post to completion queue
48  
      2. If it succeeds or fails with a real error, post to completion queue
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
49  
      3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50  

50  

51  
    This "try first" approach avoids unnecessary epoll round-trips for
51  
    This "try first" approach avoids unnecessary epoll round-trips for
52  
    operations that can complete immediately (common for small reads/writes
52  
    operations that can complete immediately (common for small reads/writes
53  
    on fast local connections).
53  
    on fast local connections).
54  

54  

55  
    One-Shot Registration
55  
    One-Shot Registration
56  
    ---------------------
56  
    ---------------------
57  
    We use one-shot epoll registration: each operation registers, waits for
57  
    We use one-shot epoll registration: each operation registers, waits for
58  
    one event, then unregisters. This simplifies the state machine since we
58  
    one event, then unregisters. This simplifies the state machine since we
59  
    don't need to track whether an fd is currently registered or handle
59  
    don't need to track whether an fd is currently registered or handle
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
60  
    re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61  
    simplicity is worth it.
61  
    simplicity is worth it.
62  

62  

63  
    Cancellation
63  
    Cancellation
64  
    ------------
64  
    ------------
65  
    See op.hpp for the completion/cancellation race handling via the
65  
    See op.hpp for the completion/cancellation race handling via the
66  
    `registered` atomic. cancel() must complete pending operations (post
66  
    `registered` atomic. cancel() must complete pending operations (post
67  
    them with cancelled flag) so coroutines waiting on them can resume.
67  
    them with cancelled flag) so coroutines waiting on them can resume.
68  
    close_socket() calls cancel() first to ensure this.
68  
    close_socket() calls cancel() first to ensure this.
69  

69  

70  
    Impl Lifetime with shared_ptr
70  
    Impl Lifetime with shared_ptr
71  
    -----------------------------
71  
    -----------------------------
72  
    Socket impls use enable_shared_from_this. The service owns impls via
72  
    Socket impls use enable_shared_from_this. The service owns impls via
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
73  
    shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74  
    removal. When a user calls close(), we call cancel() which posts pending
74  
    removal. When a user calls close(), we call cancel() which posts pending
75  
    ops to the scheduler.
75  
    ops to the scheduler.
76  

76  

77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
77  
    CRITICAL: The posted ops must keep the impl alive until they complete.
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
78  
    Otherwise the scheduler would process a freed op (use-after-free). The
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
79  
    cancel() method captures shared_from_this() into op.impl_ptr before
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
80  
    posting. When the op completes, impl_ptr is cleared, allowing the impl
81  
    to be destroyed if no other references exist.
81  
    to be destroyed if no other references exist.
82  

82  

83  
    Service Ownership
83  
    Service Ownership
84  
    -----------------
84  
    -----------------
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
85  
    epoll_socket_service owns all socket impls. destroy_impl() removes the
86  
    shared_ptr from the map, but the impl may survive if ops still hold
86  
    shared_ptr from the map, but the impl may survive if ops still hold
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
87  
    impl_ptr refs. shutdown() closes all sockets and clears the map; any
88  
    in-flight ops will complete and release their refs.
88  
    in-flight ops will complete and release their refs.
89  
*/
89  
*/
90  

90  

91  
namespace boost::corosio::detail {
91  
namespace boost::corosio::detail {
92  

92  

93  
/** State for epoll socket service. */
93  
/** State for epoll socket service. */
94  
class epoll_socket_state
94  
class epoll_socket_state
95  
{
95  
{
96  
public:
96  
public:
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
97  
    explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98  
    {
98  
    {
99  
    }
99  
    }
100  

100  

101  
    epoll_scheduler& sched_;
101  
    epoll_scheduler& sched_;
102  
    std::mutex mutex_;
102  
    std::mutex mutex_;
103  
    intrusive_list<epoll_socket> socket_list_;
103  
    intrusive_list<epoll_socket> socket_list_;
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
104  
    std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105  
        socket_ptrs_;
105  
        socket_ptrs_;
106  
};
106  
};
107  

107  

108  
/** epoll socket service implementation.
108  
/** epoll socket service implementation.
109  

109  

110  
    Inherits from socket_service to enable runtime polymorphism.
110  
    Inherits from socket_service to enable runtime polymorphism.
111  
    Uses key_type = socket_service for service lookup.
111  
    Uses key_type = socket_service for service lookup.
112  
*/
112  
*/
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
113  
class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114  
{
114  
{
115  
public:
115  
public:
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
116  
    explicit epoll_socket_service(capy::execution_context& ctx);
117  
    ~epoll_socket_service() override;
117  
    ~epoll_socket_service() override;
118  

118  

119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
119  
    epoll_socket_service(epoll_socket_service const&)            = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
120  
    epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121  

121  

122  
    void shutdown() override;
122  
    void shutdown() override;
123  

123  

124  
    io_object::implementation* construct() override;
124  
    io_object::implementation* construct() override;
125  
    void destroy(io_object::implementation*) override;
125  
    void destroy(io_object::implementation*) override;
126  
    void close(io_object::handle&) override;
126  
    void close(io_object::handle&) override;
127 -
    std::error_code open_socket(tcp_socket::implementation& impl) override;
127 +
    std::error_code
 
128 +
    open_socket(tcp_socket::implementation& impl,
 
129 +
                int family, int type, int protocol) override;
128  

130  

129  
    epoll_scheduler& scheduler() const noexcept
131  
    epoll_scheduler& scheduler() const noexcept
130  
    {
132  
    {
131  
        return state_->sched_;
133  
        return state_->sched_;
132  
    }
134  
    }
133  
    void post(epoll_op* op);
135  
    void post(epoll_op* op);
134  
    void work_started() noexcept;
136  
    void work_started() noexcept;
135  
    void work_finished() noexcept;
137  
    void work_finished() noexcept;
136  

138  

137  
private:
139  
private:
138  
    std::unique_ptr<epoll_socket_state> state_;
140  
    std::unique_ptr<epoll_socket_state> state_;
139  
};
141  
};
140  

142  

141  
//--------------------------------------------------------------------------
143  
//--------------------------------------------------------------------------
142  
//
144  
//
143  
// Implementation
145  
// Implementation
144  
//
146  
//
145  
//--------------------------------------------------------------------------
147  
//--------------------------------------------------------------------------
146  

148  

147  
// Register an op with the reactor, handling cached edge events.
149  
// Register an op with the reactor, handling cached edge events.
148  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
150  
// Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
149  
inline void
151  
inline void
150  
epoll_socket::register_op(
152  
epoll_socket::register_op(
151  
    epoll_op& op,
153  
    epoll_op& op,
152  
    epoll_op*& desc_slot,
154  
    epoll_op*& desc_slot,
153  
    bool& ready_flag,
155  
    bool& ready_flag,
154  
    bool& cancel_flag) noexcept
156  
    bool& cancel_flag) noexcept
155  
{
157  
{
156  
    svc_.work_started();
158  
    svc_.work_started();
157  

159  

158  
    std::lock_guard lock(desc_state_.mutex);
160  
    std::lock_guard lock(desc_state_.mutex);
159  
    bool io_done = false;
161  
    bool io_done = false;
160  
    if (ready_flag)
162  
    if (ready_flag)
161  
    {
163  
    {
162  
        ready_flag = false;
164  
        ready_flag = false;
163  
        op.perform_io();
165  
        op.perform_io();
164  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
166  
        io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
165  
        if (!io_done)
167  
        if (!io_done)
166  
            op.errn = 0;
168  
            op.errn = 0;
167  
    }
169  
    }
168  

170  

169  
    if (cancel_flag)
171  
    if (cancel_flag)
170  
    {
172  
    {
171  
        cancel_flag = false;
173  
        cancel_flag = false;
172  
        op.cancelled.store(true, std::memory_order_relaxed);
174  
        op.cancelled.store(true, std::memory_order_relaxed);
173  
    }
175  
    }
174  

176  

175  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
177  
    if (io_done || op.cancelled.load(std::memory_order_acquire))
176  
    {
178  
    {
177  
        svc_.post(&op);
179  
        svc_.post(&op);
178  
        svc_.work_finished();
180  
        svc_.work_finished();
179  
    }
181  
    }
180  
    else
182  
    else
181  
    {
183  
    {
182  
        desc_slot = &op;
184  
        desc_slot = &op;
183  
    }
185  
    }
184  
}
186  
}
185  

187  

186  
inline void
188  
inline void
187  
epoll_op::canceller::operator()() const noexcept
189  
epoll_op::canceller::operator()() const noexcept
188  
{
190  
{
189  
    op->cancel();
191  
    op->cancel();
190  
}
192  
}
191  

193  

192  
inline void
194  
inline void
193  
epoll_connect_op::cancel() noexcept
195  
epoll_connect_op::cancel() noexcept
194  
{
196  
{
195  
    if (socket_impl_)
197  
    if (socket_impl_)
196  
        socket_impl_->cancel_single_op(*this);
198  
        socket_impl_->cancel_single_op(*this);
197  
    else
199  
    else
198  
        request_cancel();
200  
        request_cancel();
199  
}
201  
}
200  

202  

201  
inline void
203  
inline void
202  
epoll_read_op::cancel() noexcept
204  
epoll_read_op::cancel() noexcept
203  
{
205  
{
204  
    if (socket_impl_)
206  
    if (socket_impl_)
205  
        socket_impl_->cancel_single_op(*this);
207  
        socket_impl_->cancel_single_op(*this);
206  
    else
208  
    else
207  
        request_cancel();
209  
        request_cancel();
208  
}
210  
}
209  

211  

210  
inline void
212  
inline void
211  
epoll_write_op::cancel() noexcept
213  
epoll_write_op::cancel() noexcept
212  
{
214  
{
213  
    if (socket_impl_)
215  
    if (socket_impl_)
214  
        socket_impl_->cancel_single_op(*this);
216  
        socket_impl_->cancel_single_op(*this);
215  
    else
217  
    else
216  
        request_cancel();
218  
        request_cancel();
217  
}
219  
}
218  

220  

219  
inline void
221  
inline void
220  
epoll_op::operator()()
222  
epoll_op::operator()()
221  
{
223  
{
222  
    stop_cb.reset();
224  
    stop_cb.reset();
223  

225  

224  
    socket_impl_->svc_.scheduler().reset_inline_budget();
226  
    socket_impl_->svc_.scheduler().reset_inline_budget();
225  

227  

226  
    if (cancelled.load(std::memory_order_acquire))
228  
    if (cancelled.load(std::memory_order_acquire))
227  
        *ec_out = capy::error::canceled;
229  
        *ec_out = capy::error::canceled;
228  
    else if (errn != 0)
230  
    else if (errn != 0)
229  
        *ec_out = make_err(errn);
231  
        *ec_out = make_err(errn);
230  
    else if (is_read_operation() && bytes_transferred == 0)
232  
    else if (is_read_operation() && bytes_transferred == 0)
231  
        *ec_out = capy::error::eof;
233  
        *ec_out = capy::error::eof;
232  
    else
234  
    else
233  
        *ec_out = {};
235  
        *ec_out = {};
234  

236  

235  
    *bytes_out = bytes_transferred;
237  
    *bytes_out = bytes_transferred;
236  

238  

237  
    // Move to stack before resuming coroutine. The coroutine might close
239  
    // Move to stack before resuming coroutine. The coroutine might close
238  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
240  
    // the socket, releasing the last wrapper ref. If impl_ptr were the
239  
    // last ref and we destroyed it while still in operator(), we'd have
241  
    // last ref and we destroyed it while still in operator(), we'd have
240  
    // use-after-free. Moving to local ensures destruction happens at
242  
    // use-after-free. Moving to local ensures destruction happens at
241  
    // function exit, after all member accesses are complete.
243  
    // function exit, after all member accesses are complete.
242  
    capy::executor_ref saved_ex(ex);
244  
    capy::executor_ref saved_ex(ex);
243  
    std::coroutine_handle<> saved_h(h);
245  
    std::coroutine_handle<> saved_h(h);
244  
    auto prevent_premature_destruction = std::move(impl_ptr);
246  
    auto prevent_premature_destruction = std::move(impl_ptr);
245  
    dispatch_coro(saved_ex, saved_h).resume();
247  
    dispatch_coro(saved_ex, saved_h).resume();
246  
}
248  
}
247  

249  

248  
inline void
250  
inline void
249  
epoll_connect_op::operator()()
251  
epoll_connect_op::operator()()
250  
{
252  
{
251  
    stop_cb.reset();
253  
    stop_cb.reset();
252  

254  

253  
    socket_impl_->svc_.scheduler().reset_inline_budget();
255  
    socket_impl_->svc_.scheduler().reset_inline_budget();
254  

256  

255  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
257  
    bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
256  

258  

257  
    // Cache endpoints on successful connect
259  
    // Cache endpoints on successful connect
258  
    if (success && socket_impl_)
260  
    if (success && socket_impl_)
259 -
        // Query local endpoint via getsockname (may fail, but remote is always known)
 
260  
    {
261  
    {
261  
        endpoint local_ep;
262  
        endpoint local_ep;
262 -
        sockaddr_in local_addr{};
263 +
        sockaddr_storage local_storage{};
263 -
        socklen_t local_len = sizeof(local_addr);
264 +
        socklen_t local_len = sizeof(local_storage);
264  
        if (::getsockname(
265  
        if (::getsockname(
265 -
                fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
266 +
                fd, reinterpret_cast<sockaddr*>(&local_storage),
266 -
            local_ep = from_sockaddr_in(local_addr);
267 +
                &local_len) == 0)
267 -
        // Always cache remote endpoint; local may be default if getsockname failed
268 +
            local_ep = from_sockaddr(local_storage);
268  
        static_cast<epoll_socket*>(socket_impl_)
269  
        static_cast<epoll_socket*>(socket_impl_)
269  
            ->set_endpoints(local_ep, target_endpoint);
270  
            ->set_endpoints(local_ep, target_endpoint);
270  
    }
271  
    }
271  

272  

272  
    if (cancelled.load(std::memory_order_acquire))
273  
    if (cancelled.load(std::memory_order_acquire))
273  
        *ec_out = capy::error::canceled;
274  
        *ec_out = capy::error::canceled;
274  
    else if (errn != 0)
275  
    else if (errn != 0)
275  
        *ec_out = make_err(errn);
276  
        *ec_out = make_err(errn);
276  
    else
277  
    else
277  
        *ec_out = {};
278  
        *ec_out = {};
278  

279  

279  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
280  
    // Move to stack before resuming. See epoll_op::operator()() for rationale.
280  
    capy::executor_ref saved_ex(ex);
281  
    capy::executor_ref saved_ex(ex);
281  
    std::coroutine_handle<> saved_h(h);
282  
    std::coroutine_handle<> saved_h(h);
282  
    auto prevent_premature_destruction = std::move(impl_ptr);
283  
    auto prevent_premature_destruction = std::move(impl_ptr);
283  
    dispatch_coro(saved_ex, saved_h).resume();
284  
    dispatch_coro(saved_ex, saved_h).resume();
284  
}
285  
}
285  

286  

286  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
287  
inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
287  
    : svc_(svc)
288  
    : svc_(svc)
288  
{
289  
{
289  
}
290  
}
290  

291  

291  
inline epoll_socket::~epoll_socket() = default;
292  
inline epoll_socket::~epoll_socket() = default;
292  

293  

293  
inline std::coroutine_handle<>
294  
inline std::coroutine_handle<>
294  
epoll_socket::connect(
295  
epoll_socket::connect(
295  
    std::coroutine_handle<> h,
296  
    std::coroutine_handle<> h,
296  
    capy::executor_ref ex,
297  
    capy::executor_ref ex,
297  
    endpoint ep,
298  
    endpoint ep,
298  
    std::stop_token token,
299  
    std::stop_token token,
299  
    std::error_code* ec)
300  
    std::error_code* ec)
300  
{
301  
{
301  
    auto& op = conn_;
302  
    auto& op = conn_;
302  

303  

303 -
    sockaddr_in addr = detail::to_sockaddr_in(ep);
304 +
    sockaddr_storage storage{};
 
305 +
    socklen_t addrlen =
 
306 +
        detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
304  
    int result =
307  
    int result =
305 -
        ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
308 +
        ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
306  

309  

307  
    if (result == 0)
310  
    if (result == 0)
308  
    {
311  
    {
309 -
        sockaddr_in local_addr{};
312 +
        sockaddr_storage local_storage{};
310 -
        socklen_t local_len = sizeof(local_addr);
313 +
        socklen_t local_len = sizeof(local_storage);
311  
        if (::getsockname(
314  
        if (::getsockname(
312 -
                fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
315 +
                fd_, reinterpret_cast<sockaddr*>(&local_storage),
313 -
            local_endpoint_ = detail::from_sockaddr_in(local_addr);
316 +
                &local_len) == 0)
 
317 +
            local_endpoint_ = detail::from_sockaddr(local_storage);
314  
        remote_endpoint_ = ep;
318  
        remote_endpoint_ = ep;
315  
    }
319  
    }
316  

320  

317  
    if (result == 0 || errno != EINPROGRESS)
321  
    if (result == 0 || errno != EINPROGRESS)
318  
    {
322  
    {
319  
        int err = (result < 0) ? errno : 0;
323  
        int err = (result < 0) ? errno : 0;
320  
        if (svc_.scheduler().try_consume_inline_budget())
324  
        if (svc_.scheduler().try_consume_inline_budget())
321  
        {
325  
        {
322  
            *ec = err ? make_err(err) : std::error_code{};
326  
            *ec = err ? make_err(err) : std::error_code{};
323  
            return dispatch_coro(ex, h);
327  
            return dispatch_coro(ex, h);
324  
        }
328  
        }
325  
        op.reset();
329  
        op.reset();
326  
        op.h               = h;
330  
        op.h               = h;
327  
        op.ex              = ex;
331  
        op.ex              = ex;
328  
        op.ec_out          = ec;
332  
        op.ec_out          = ec;
329  
        op.fd              = fd_;
333  
        op.fd              = fd_;
330  
        op.target_endpoint = ep;
334  
        op.target_endpoint = ep;
331  
        op.start(token, this);
335  
        op.start(token, this);
332  
        op.impl_ptr = shared_from_this();
336  
        op.impl_ptr = shared_from_this();
333  
        op.complete(err, 0);
337  
        op.complete(err, 0);
334  
        svc_.post(&op);
338  
        svc_.post(&op);
335  
        return std::noop_coroutine();
339  
        return std::noop_coroutine();
336  
    }
340  
    }
337  

341  

338  
    // EINPROGRESS — register with reactor
342  
    // EINPROGRESS — register with reactor
339  
    op.reset();
343  
    op.reset();
340  
    op.h               = h;
344  
    op.h               = h;
341  
    op.ex              = ex;
345  
    op.ex              = ex;
342  
    op.ec_out          = ec;
346  
    op.ec_out          = ec;
343  
    op.fd              = fd_;
347  
    op.fd              = fd_;
344  
    op.target_endpoint = ep;
348  
    op.target_endpoint = ep;
345  
    op.start(token, this);
349  
    op.start(token, this);
346  
    op.impl_ptr = shared_from_this();
350  
    op.impl_ptr = shared_from_this();
347  

351  

348  
    register_op(
352  
    register_op(
349  
        op, desc_state_.connect_op, desc_state_.write_ready,
353  
        op, desc_state_.connect_op, desc_state_.write_ready,
350  
        desc_state_.connect_cancel_pending);
354  
        desc_state_.connect_cancel_pending);
351  
    return std::noop_coroutine();
355  
    return std::noop_coroutine();
352  
}
356  
}
353  

357  

354  
inline std::coroutine_handle<>
358  
inline std::coroutine_handle<>
355  
epoll_socket::read_some(
359  
epoll_socket::read_some(
356  
    std::coroutine_handle<> h,
360  
    std::coroutine_handle<> h,
357  
    capy::executor_ref ex,
361  
    capy::executor_ref ex,
358  
    io_buffer_param param,
362  
    io_buffer_param param,
359  
    std::stop_token token,
363  
    std::stop_token token,
360  
    std::error_code* ec,
364  
    std::error_code* ec,
361  
    std::size_t* bytes_out)
365  
    std::size_t* bytes_out)
362  
{
366  
{
363  
    auto& op = rd_;
367  
    auto& op = rd_;
364  
    op.reset();
368  
    op.reset();
365  

369  

366  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
370  
    capy::mutable_buffer bufs[epoll_read_op::max_buffers];
367  
    op.iovec_count =
371  
    op.iovec_count =
368  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
372  
        static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
369  

373  

370  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
374  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
371  
    {
375  
    {
372  
        op.empty_buffer_read = true;
376  
        op.empty_buffer_read = true;
373  
        op.h                 = h;
377  
        op.h                 = h;
374  
        op.ex                = ex;
378  
        op.ex                = ex;
375  
        op.ec_out            = ec;
379  
        op.ec_out            = ec;
376  
        op.bytes_out         = bytes_out;
380  
        op.bytes_out         = bytes_out;
377  
        op.start(token, this);
381  
        op.start(token, this);
378  
        op.impl_ptr = shared_from_this();
382  
        op.impl_ptr = shared_from_this();
379  
        op.complete(0, 0);
383  
        op.complete(0, 0);
380  
        svc_.post(&op);
384  
        svc_.post(&op);
381  
        return std::noop_coroutine();
385  
        return std::noop_coroutine();
382  
    }
386  
    }
383  

387  

384  
    for (int i = 0; i < op.iovec_count; ++i)
388  
    for (int i = 0; i < op.iovec_count; ++i)
385  
    {
389  
    {
386  
        op.iovecs[i].iov_base = bufs[i].data();
390  
        op.iovecs[i].iov_base = bufs[i].data();
387  
        op.iovecs[i].iov_len  = bufs[i].size();
391  
        op.iovecs[i].iov_len  = bufs[i].size();
388  
    }
392  
    }
389  

393  

390  
    // Speculative read
394  
    // Speculative read
391  
    ssize_t n;
395  
    ssize_t n;
392  
    do
396  
    do
393  
    {
397  
    {
394  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
398  
        n = ::readv(fd_, op.iovecs, op.iovec_count);
395  
    }
399  
    }
396  
    while (n < 0 && errno == EINTR);
400  
    while (n < 0 && errno == EINTR);
397  

401  

398  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
402  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
399  
    {
403  
    {
400  
        int err    = (n < 0) ? errno : 0;
404  
        int err    = (n < 0) ? errno : 0;
401  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
405  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
402  

406  

403  
        if (svc_.scheduler().try_consume_inline_budget())
407  
        if (svc_.scheduler().try_consume_inline_budget())
404  
        {
408  
        {
405  
            if (err)
409  
            if (err)
406  
                *ec = make_err(err);
410  
                *ec = make_err(err);
407  
            else if (n == 0)
411  
            else if (n == 0)
408  
                *ec = capy::error::eof;
412  
                *ec = capy::error::eof;
409  
            else
413  
            else
410  
                *ec = {};
414  
                *ec = {};
411  
            *bytes_out = bytes;
415  
            *bytes_out = bytes;
412  
            return dispatch_coro(ex, h);
416  
            return dispatch_coro(ex, h);
413  
        }
417  
        }
414  
        op.h         = h;
418  
        op.h         = h;
415  
        op.ex        = ex;
419  
        op.ex        = ex;
416  
        op.ec_out    = ec;
420  
        op.ec_out    = ec;
417  
        op.bytes_out = bytes_out;
421  
        op.bytes_out = bytes_out;
418  
        op.start(token, this);
422  
        op.start(token, this);
419  
        op.impl_ptr = shared_from_this();
423  
        op.impl_ptr = shared_from_this();
420  
        op.complete(err, bytes);
424  
        op.complete(err, bytes);
421  
        svc_.post(&op);
425  
        svc_.post(&op);
422  
        return std::noop_coroutine();
426  
        return std::noop_coroutine();
423  
    }
427  
    }
424  

428  

425  
    // EAGAIN — register with reactor
429  
    // EAGAIN — register with reactor
426  
    op.h         = h;
430  
    op.h         = h;
427  
    op.ex        = ex;
431  
    op.ex        = ex;
428  
    op.ec_out    = ec;
432  
    op.ec_out    = ec;
429  
    op.bytes_out = bytes_out;
433  
    op.bytes_out = bytes_out;
430  
    op.fd        = fd_;
434  
    op.fd        = fd_;
431  
    op.start(token, this);
435  
    op.start(token, this);
432  
    op.impl_ptr = shared_from_this();
436  
    op.impl_ptr = shared_from_this();
433  

437  

434  
    register_op(
438  
    register_op(
435  
        op, desc_state_.read_op, desc_state_.read_ready,
439  
        op, desc_state_.read_op, desc_state_.read_ready,
436  
        desc_state_.read_cancel_pending);
440  
        desc_state_.read_cancel_pending);
437  
    return std::noop_coroutine();
441  
    return std::noop_coroutine();
438  
}
442  
}
439  

443  

440  
inline std::coroutine_handle<>
444  
inline std::coroutine_handle<>
441  
epoll_socket::write_some(
445  
epoll_socket::write_some(
442  
    std::coroutine_handle<> h,
446  
    std::coroutine_handle<> h,
443  
    capy::executor_ref ex,
447  
    capy::executor_ref ex,
444  
    io_buffer_param param,
448  
    io_buffer_param param,
445  
    std::stop_token token,
449  
    std::stop_token token,
446  
    std::error_code* ec,
450  
    std::error_code* ec,
447  
    std::size_t* bytes_out)
451  
    std::size_t* bytes_out)
448  
{
452  
{
449  
    auto& op = wr_;
453  
    auto& op = wr_;
450  
    op.reset();
454  
    op.reset();
451  

455  

452  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
456  
    capy::mutable_buffer bufs[epoll_write_op::max_buffers];
453  
    op.iovec_count =
457  
    op.iovec_count =
454  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
458  
        static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
455  

459  

456  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
460  
    if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
457  
    {
461  
    {
458  
        op.h         = h;
462  
        op.h         = h;
459  
        op.ex        = ex;
463  
        op.ex        = ex;
460  
        op.ec_out    = ec;
464  
        op.ec_out    = ec;
461  
        op.bytes_out = bytes_out;
465  
        op.bytes_out = bytes_out;
462  
        op.start(token, this);
466  
        op.start(token, this);
463  
        op.impl_ptr = shared_from_this();
467  
        op.impl_ptr = shared_from_this();
464  
        op.complete(0, 0);
468  
        op.complete(0, 0);
465  
        svc_.post(&op);
469  
        svc_.post(&op);
466  
        return std::noop_coroutine();
470  
        return std::noop_coroutine();
467  
    }
471  
    }
468  

472  

469  
    for (int i = 0; i < op.iovec_count; ++i)
473  
    for (int i = 0; i < op.iovec_count; ++i)
470  
    {
474  
    {
471  
        op.iovecs[i].iov_base = bufs[i].data();
475  
        op.iovecs[i].iov_base = bufs[i].data();
472  
        op.iovecs[i].iov_len  = bufs[i].size();
476  
        op.iovecs[i].iov_len  = bufs[i].size();
473  
    }
477  
    }
474  

478  

475  
    // Speculative write
479  
    // Speculative write
476  
    msghdr msg{};
480  
    msghdr msg{};
477  
    msg.msg_iov    = op.iovecs;
481  
    msg.msg_iov    = op.iovecs;
478  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
482  
    msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
479  

483  

480  
    ssize_t n;
484  
    ssize_t n;
481  
    do
485  
    do
482  
    {
486  
    {
483  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
487  
        n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
484  
    }
488  
    }
485  
    while (n < 0 && errno == EINTR);
489  
    while (n < 0 && errno == EINTR);
486  

490  

487  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
491  
    if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
488  
    {
492  
    {
489  
        int err    = (n < 0) ? errno : 0;
493  
        int err    = (n < 0) ? errno : 0;
490  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
494  
        auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
491  

495  

492  
        if (svc_.scheduler().try_consume_inline_budget())
496  
        if (svc_.scheduler().try_consume_inline_budget())
493  
        {
497  
        {
494  
            *ec        = err ? make_err(err) : std::error_code{};
498  
            *ec        = err ? make_err(err) : std::error_code{};
495  
            *bytes_out = bytes;
499  
            *bytes_out = bytes;
496  
            return dispatch_coro(ex, h);
500  
            return dispatch_coro(ex, h);
497  
        }
501  
        }
498  
        op.h         = h;
502  
        op.h         = h;
499  
        op.ex        = ex;
503  
        op.ex        = ex;
500  
        op.ec_out    = ec;
504  
        op.ec_out    = ec;
501  
        op.bytes_out = bytes_out;
505  
        op.bytes_out = bytes_out;
502  
        op.start(token, this);
506  
        op.start(token, this);
503  
        op.impl_ptr = shared_from_this();
507  
        op.impl_ptr = shared_from_this();
504  
        op.complete(err, bytes);
508  
        op.complete(err, bytes);
505  
        svc_.post(&op);
509  
        svc_.post(&op);
506  
        return std::noop_coroutine();
510  
        return std::noop_coroutine();
507  
    }
511  
    }
508  

512  

509  
    // EAGAIN — register with reactor
513  
    // EAGAIN — register with reactor
510  
    op.h         = h;
514  
    op.h         = h;
511  
    op.ex        = ex;
515  
    op.ex        = ex;
512  
    op.ec_out    = ec;
516  
    op.ec_out    = ec;
513  
    op.bytes_out = bytes_out;
517  
    op.bytes_out = bytes_out;
514  
    op.fd        = fd_;
518  
    op.fd        = fd_;
515  
    op.start(token, this);
519  
    op.start(token, this);
516  
    op.impl_ptr = shared_from_this();
520  
    op.impl_ptr = shared_from_this();
517  

521  

518  
    register_op(
522  
    register_op(
519  
        op, desc_state_.write_op, desc_state_.write_ready,
523  
        op, desc_state_.write_op, desc_state_.write_ready,
520  
        desc_state_.write_cancel_pending);
524  
        desc_state_.write_cancel_pending);
521  
    return std::noop_coroutine();
525  
    return std::noop_coroutine();
522  
}
526  
}
523  

527  

524  
inline std::error_code
528  
inline std::error_code
525  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
529  
epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
526  
{
530  
{
527  
    int how;
531  
    int how;
528  
    switch (what)
532  
    switch (what)
529  
    {
533  
    {
530  
    case tcp_socket::shutdown_receive:
534  
    case tcp_socket::shutdown_receive:
531  
        how = SHUT_RD;
535  
        how = SHUT_RD;
532  
        break;
536  
        break;
533  
    case tcp_socket::shutdown_send:
537  
    case tcp_socket::shutdown_send:
534  
        how = SHUT_WR;
538  
        how = SHUT_WR;
535  
        break;
539  
        break;
536  
    case tcp_socket::shutdown_both:
540  
    case tcp_socket::shutdown_both:
537  
        how = SHUT_RDWR;
541  
        how = SHUT_RDWR;
538  
        break;
542  
        break;
539  
    default:
543  
    default:
540  
        return make_err(EINVAL);
544  
        return make_err(EINVAL);
541  
    }
545  
    }
542  
    if (::shutdown(fd_, how) != 0)
546  
    if (::shutdown(fd_, how) != 0)
543  
        return make_err(errno);
547  
        return make_err(errno);
544  
    return {};
548  
    return {};
545  
}
549  
}
546  

550  

547  
inline std::error_code
551  
inline std::error_code
548 -
epoll_socket::set_no_delay(bool value) noexcept
552 +
epoll_socket::set_option(
549 -
{
553 +
    int level, int optname,
550 -
    int flag = value ? 1 : 0;
554 +
    void const* data, std::size_t size) noexcept
551 -
    if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
 
552 -
        return make_err(errno);
 
553 -
    return {};
 
554 -
}
 
555 -

 
556 -
inline bool
 
557 -
epoll_socket::no_delay(std::error_code& ec) const noexcept
 
558 -
{
 
559 -
    int flag      = 0;
 
560 -
    socklen_t len = sizeof(flag);
 
561 -
    if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
 
562 -
    {
 
563 -
        ec = make_err(errno);
 
564 -
        return false;
 
565 -
    }
 
566 -
    ec = {};
 
567 -
    return flag != 0;
 
568 -
}
 
569 -

 
570 -
inline std::error_code
 
571 -
epoll_socket::set_keep_alive(bool value) noexcept
 
572 -
{
 
573 -
    int flag = value ? 1 : 0;
 
574 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
 
575 -
        return make_err(errno);
 
576 -
    return {};
 
577 -
}
 
578 -

 
579 -
inline bool
 
580 -
epoll_socket::keep_alive(std::error_code& ec) const noexcept
 
581 -
{
 
582 -
    int flag      = 0;
 
583 -
    socklen_t len = sizeof(flag);
 
584 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
 
585 -
    {
 
586 -
        ec = make_err(errno);
 
587 -
        return false;
 
588 -
    }
 
589 -
    ec = {};
 
590 -
    return flag != 0;
 
591 -
}
 
592 -

 
593 -
inline std::error_code
 
594 -
epoll_socket::set_receive_buffer_size(int size) noexcept
 
595 -
{
 
596 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
 
597 -
        return make_err(errno);
 
598 -
    return {};
 
599 -
}
 
600 -

 
601 -
inline int
 
602 -
epoll_socket::receive_buffer_size(std::error_code& ec) const noexcept
 
603 -
{
 
604 -
    int size      = 0;
 
605 -
    socklen_t len = sizeof(size);
 
606 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
 
607 -
    {
 
608 -
        ec = make_err(errno);
 
609 -
        return 0;
 
610 -
    }
 
611 -
    ec = {};
 
612 -
    return size;
 
613 -
}
 
614 -

 
615 -
inline std::error_code
 
616 -
epoll_socket::set_send_buffer_size(int size) noexcept
 
617  
{
555  
{
618 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
556 +
    if (::setsockopt(fd_, level, optname, data,
 
557 +
            static_cast<socklen_t>(size)) != 0)
619  
        return make_err(errno);
558  
        return make_err(errno);
620  
    return {};
559  
    return {};
621  
}
560  
}
622 -
inline int
 
623 -
epoll_socket::send_buffer_size(std::error_code& ec) const noexcept
 
624 -
{
 
625 -
    int size      = 0;
 
626 -
    socklen_t len = sizeof(size);
 
627 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
 
628 -
    {
 
629 -
        ec = make_err(errno);
 
630 -
        return 0;
 
631 -
    }
 
632 -
    ec = {};
 
633 -
    return size;
 
634 -
}
 
635 -

 
636  

561  

637  
inline std::error_code
562  
inline std::error_code
638 -
epoll_socket::set_linger(bool enabled, int timeout) noexcept
563 +
epoll_socket::get_option(
 
564 +
    int level, int optname,
 
565 +
    void* data, std::size_t* size) const noexcept
639  
{
566  
{
640 -
    if (timeout < 0)
567 +
    socklen_t len = static_cast<socklen_t>(*size);
641 -
        return make_err(EINVAL);
568 +
    if (::getsockopt(fd_, level, optname, data, &len) != 0)
642 -
    struct ::linger lg;
 
643 -
    lg.l_onoff  = enabled ? 1 : 0;
 
644 -
    lg.l_linger = timeout;
 
645 -
    if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
 
646  
        return make_err(errno);
569  
        return make_err(errno);
 
570 +
    *size = static_cast<std::size_t>(len);
647  
    return {};
571  
    return {};
648  
}
572  
}
649 -
inline tcp_socket::linger_options
 
650 -
epoll_socket::linger(std::error_code& ec) const noexcept
 
651 -
{
 
652 -
    struct ::linger lg{};
 
653 -
    socklen_t len = sizeof(lg);
 
654 -
    if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
 
655 -
    {
 
656 -
        ec = make_err(errno);
 
657 -
        return {};
 
658 -
    }
 
659 -
    ec = {};
 
660 -
    return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
 
661 -
}
 
662 -

 
663  

573  

664  
inline void
574  
inline void
665  
epoll_socket::cancel() noexcept
575  
epoll_socket::cancel() noexcept
666  
{
576  
{
667  
    auto self = weak_from_this().lock();
577  
    auto self = weak_from_this().lock();
668  
    if (!self)
578  
    if (!self)
669  
        return;
579  
        return;
670  

580  

671  
    conn_.request_cancel();
581  
    conn_.request_cancel();
672  
    rd_.request_cancel();
582  
    rd_.request_cancel();
673  
    wr_.request_cancel();
583  
    wr_.request_cancel();
674  

584  

675  
    epoll_op* conn_claimed = nullptr;
585  
    epoll_op* conn_claimed = nullptr;
676  
    epoll_op* rd_claimed   = nullptr;
586  
    epoll_op* rd_claimed   = nullptr;
677  
    epoll_op* wr_claimed   = nullptr;
587  
    epoll_op* wr_claimed   = nullptr;
678  
    {
588  
    {
679  
        std::lock_guard lock(desc_state_.mutex);
589  
        std::lock_guard lock(desc_state_.mutex);
680  
        if (desc_state_.connect_op == &conn_)
590  
        if (desc_state_.connect_op == &conn_)
681  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
591  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
682  
        else
592  
        else
683  
            desc_state_.connect_cancel_pending = true;
593  
            desc_state_.connect_cancel_pending = true;
684  
        if (desc_state_.read_op == &rd_)
594  
        if (desc_state_.read_op == &rd_)
685  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
595  
            rd_claimed = std::exchange(desc_state_.read_op, nullptr);
686  
        else
596  
        else
687  
            desc_state_.read_cancel_pending = true;
597  
            desc_state_.read_cancel_pending = true;
688  
        if (desc_state_.write_op == &wr_)
598  
        if (desc_state_.write_op == &wr_)
689  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
599  
            wr_claimed = std::exchange(desc_state_.write_op, nullptr);
690  
        else
600  
        else
691  
            desc_state_.write_cancel_pending = true;
601  
            desc_state_.write_cancel_pending = true;
692  
    }
602  
    }
693  

603  

694  
    if (conn_claimed)
604  
    if (conn_claimed)
695  
    {
605  
    {
696  
        conn_.impl_ptr = self;
606  
        conn_.impl_ptr = self;
697  
        svc_.post(&conn_);
607  
        svc_.post(&conn_);
698  
        svc_.work_finished();
608  
        svc_.work_finished();
699  
    }
609  
    }
700  
    if (rd_claimed)
610  
    if (rd_claimed)
701  
    {
611  
    {
702  
        rd_.impl_ptr = self;
612  
        rd_.impl_ptr = self;
703  
        svc_.post(&rd_);
613  
        svc_.post(&rd_);
704  
        svc_.work_finished();
614  
        svc_.work_finished();
705  
    }
615  
    }
706  
    if (wr_claimed)
616  
    if (wr_claimed)
707  
    {
617  
    {
708  
        wr_.impl_ptr = self;
618  
        wr_.impl_ptr = self;
709  
        svc_.post(&wr_);
619  
        svc_.post(&wr_);
710  
        svc_.work_finished();
620  
        svc_.work_finished();
711  
    }
621  
    }
712  
}
622  
}
713  

623  

714  
inline void
624  
inline void
715  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
625  
epoll_socket::cancel_single_op(epoll_op& op) noexcept
716  
{
626  
{
717  
    auto self = weak_from_this().lock();
627  
    auto self = weak_from_this().lock();
718  
    if (!self)
628  
    if (!self)
719  
        return;
629  
        return;
720  

630  

721  
    op.request_cancel();
631  
    op.request_cancel();
722  

632  

723  
    epoll_op** desc_op_ptr = nullptr;
633  
    epoll_op** desc_op_ptr = nullptr;
724  
    if (&op == &conn_)
634  
    if (&op == &conn_)
725  
        desc_op_ptr = &desc_state_.connect_op;
635  
        desc_op_ptr = &desc_state_.connect_op;
726  
    else if (&op == &rd_)
636  
    else if (&op == &rd_)
727  
        desc_op_ptr = &desc_state_.read_op;
637  
        desc_op_ptr = &desc_state_.read_op;
728  
    else if (&op == &wr_)
638  
    else if (&op == &wr_)
729  
        desc_op_ptr = &desc_state_.write_op;
639  
        desc_op_ptr = &desc_state_.write_op;
730  

640  

731  
    if (desc_op_ptr)
641  
    if (desc_op_ptr)
732  
    {
642  
    {
733  
        epoll_op* claimed = nullptr;
643  
        epoll_op* claimed = nullptr;
734  
        {
644  
        {
735  
            std::lock_guard lock(desc_state_.mutex);
645  
            std::lock_guard lock(desc_state_.mutex);
736  
            if (*desc_op_ptr == &op)
646  
            if (*desc_op_ptr == &op)
737  
                claimed = std::exchange(*desc_op_ptr, nullptr);
647  
                claimed = std::exchange(*desc_op_ptr, nullptr);
738  
            else if (&op == &conn_)
648  
            else if (&op == &conn_)
739  
                desc_state_.connect_cancel_pending = true;
649  
                desc_state_.connect_cancel_pending = true;
740  
            else if (&op == &rd_)
650  
            else if (&op == &rd_)
741  
                desc_state_.read_cancel_pending = true;
651  
                desc_state_.read_cancel_pending = true;
742  
            else if (&op == &wr_)
652  
            else if (&op == &wr_)
743  
                desc_state_.write_cancel_pending = true;
653  
                desc_state_.write_cancel_pending = true;
744  
        }
654  
        }
745  
        if (claimed)
655  
        if (claimed)
746  
        {
656  
        {
747  
            op.impl_ptr = self;
657  
            op.impl_ptr = self;
748  
            svc_.post(&op);
658  
            svc_.post(&op);
749  
            svc_.work_finished();
659  
            svc_.work_finished();
750  
        }
660  
        }
751  
    }
661  
    }
752  
}
662  
}
753  

663  

754  
inline void
664  
inline void
755  
epoll_socket::close_socket() noexcept
665  
epoll_socket::close_socket() noexcept
756  
{
666  
{
757  
    auto self = weak_from_this().lock();
667  
    auto self = weak_from_this().lock();
758  
    if (self)
668  
    if (self)
759  
    {
669  
    {
760  
        conn_.request_cancel();
670  
        conn_.request_cancel();
761  
        rd_.request_cancel();
671  
        rd_.request_cancel();
762  
        wr_.request_cancel();
672  
        wr_.request_cancel();
763  

673  

764  
        epoll_op* conn_claimed = nullptr;
674  
        epoll_op* conn_claimed = nullptr;
765  
        epoll_op* rd_claimed   = nullptr;
675  
        epoll_op* rd_claimed   = nullptr;
766  
        epoll_op* wr_claimed   = nullptr;
676  
        epoll_op* wr_claimed   = nullptr;
767  
        {
677  
        {
768  
            std::lock_guard lock(desc_state_.mutex);
678  
            std::lock_guard lock(desc_state_.mutex);
769  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
679  
            conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
770  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
680  
            rd_claimed   = std::exchange(desc_state_.read_op, nullptr);
771  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
681  
            wr_claimed   = std::exchange(desc_state_.write_op, nullptr);
772  
            desc_state_.read_ready             = false;
682  
            desc_state_.read_ready             = false;
773  
            desc_state_.write_ready            = false;
683  
            desc_state_.write_ready            = false;
774  
            desc_state_.read_cancel_pending    = false;
684  
            desc_state_.read_cancel_pending    = false;
775  
            desc_state_.write_cancel_pending   = false;
685  
            desc_state_.write_cancel_pending   = false;
776  
            desc_state_.connect_cancel_pending = false;
686  
            desc_state_.connect_cancel_pending = false;
777  
        }
687  
        }
778  

688  

779  
        if (conn_claimed)
689  
        if (conn_claimed)
780  
        {
690  
        {
781  
            conn_.impl_ptr = self;
691  
            conn_.impl_ptr = self;
782  
            svc_.post(&conn_);
692  
            svc_.post(&conn_);
783  
            svc_.work_finished();
693  
            svc_.work_finished();
784  
        }
694  
        }
785  
        if (rd_claimed)
695  
        if (rd_claimed)
786  
        {
696  
        {
787  
            rd_.impl_ptr = self;
697  
            rd_.impl_ptr = self;
788  
            svc_.post(&rd_);
698  
            svc_.post(&rd_);
789  
            svc_.work_finished();
699  
            svc_.work_finished();
790  
        }
700  
        }
791  
        if (wr_claimed)
701  
        if (wr_claimed)
792  
        {
702  
        {
793  
            wr_.impl_ptr = self;
703  
            wr_.impl_ptr = self;
794  
            svc_.post(&wr_);
704  
            svc_.post(&wr_);
795  
            svc_.work_finished();
705  
            svc_.work_finished();
796  
        }
706  
        }
797  

707  

798  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708  
        if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
799  
            desc_state_.impl_ref_ = self;
709  
            desc_state_.impl_ref_ = self;
800  
    }
710  
    }
801  

711  

802  
    if (fd_ >= 0)
712  
    if (fd_ >= 0)
803  
    {
713  
    {
804  
        if (desc_state_.registered_events != 0)
714  
        if (desc_state_.registered_events != 0)
805  
            svc_.scheduler().deregister_descriptor(fd_);
715  
            svc_.scheduler().deregister_descriptor(fd_);
806  
        ::close(fd_);
716  
        ::close(fd_);
807  
        fd_ = -1;
717  
        fd_ = -1;
808  
    }
718  
    }
809  

719  

810  
    desc_state_.fd                = -1;
720  
    desc_state_.fd                = -1;
811  
    desc_state_.registered_events = 0;
721  
    desc_state_.registered_events = 0;
812  

722  

813  
    local_endpoint_  = endpoint{};
723  
    local_endpoint_  = endpoint{};
814  
    remote_endpoint_ = endpoint{};
724  
    remote_endpoint_ = endpoint{};
815  
}
725  
}
816  

726  

817  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
727  
inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
818  
    : state_(
728  
    : state_(
819  
          std::make_unique<epoll_socket_state>(
729  
          std::make_unique<epoll_socket_state>(
820  
              ctx.use_service<epoll_scheduler>()))
730  
              ctx.use_service<epoll_scheduler>()))
821  
{
731  
{
822  
}
732  
}
823  

733  

824  
inline epoll_socket_service::~epoll_socket_service() {}
734  
inline epoll_socket_service::~epoll_socket_service() {}
825  

735  

826  
inline void
736  
inline void
827  
epoll_socket_service::shutdown()
737  
epoll_socket_service::shutdown()
828  
{
738  
{
829  
    std::lock_guard lock(state_->mutex_);
739  
    std::lock_guard lock(state_->mutex_);
830  

740  

831  
    while (auto* impl = state_->socket_list_.pop_front())
741  
    while (auto* impl = state_->socket_list_.pop_front())
832  
        impl->close_socket();
742  
        impl->close_socket();
833  

743  

834  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
744  
    // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
835  
    // drains completed_ops_, calling destroy() on each queued op. If we
745  
    // drains completed_ops_, calling destroy() on each queued op. If we
836  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
746  
    // released our shared_ptrs now, an epoll_op::destroy() could free the
837  
    // last ref to an impl whose embedded descriptor_state is still linked
747  
    // last ref to an impl whose embedded descriptor_state is still linked
838  
    // in the queue — use-after-free on the next pop(). Letting ~state_
748  
    // in the queue — use-after-free on the next pop(). Letting ~state_
839  
    // release the ptrs (during service destruction, after scheduler
749  
    // release the ptrs (during service destruction, after scheduler
840  
    // shutdown) keeps every impl alive until all ops have been drained.
750  
    // shutdown) keeps every impl alive until all ops have been drained.
841  
}
751  
}
842  

752  

843  
inline io_object::implementation*
753  
inline io_object::implementation*
844  
epoll_socket_service::construct()
754  
epoll_socket_service::construct()
845  
{
755  
{
846  
    auto impl = std::make_shared<epoll_socket>(*this);
756  
    auto impl = std::make_shared<epoll_socket>(*this);
847  
    auto* raw = impl.get();
757  
    auto* raw = impl.get();
848  

758  

849  
    {
759  
    {
850  
        std::lock_guard lock(state_->mutex_);
760  
        std::lock_guard lock(state_->mutex_);
851  
        state_->socket_list_.push_back(raw);
761  
        state_->socket_list_.push_back(raw);
852  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
762  
        state_->socket_ptrs_.emplace(raw, std::move(impl));
853  
    }
763  
    }
854  

764  

855  
    return raw;
765  
    return raw;
856  
}
766  
}
857  

767  

858  
inline void
768  
inline void
859  
epoll_socket_service::destroy(io_object::implementation* impl)
769  
epoll_socket_service::destroy(io_object::implementation* impl)
860  
{
770  
{
861  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
771  
    auto* epoll_impl = static_cast<epoll_socket*>(impl);
862  
    epoll_impl->close_socket();
772  
    epoll_impl->close_socket();
863  
    std::lock_guard lock(state_->mutex_);
773  
    std::lock_guard lock(state_->mutex_);
864  
    state_->socket_list_.remove(epoll_impl);
774  
    state_->socket_list_.remove(epoll_impl);
865  
    state_->socket_ptrs_.erase(epoll_impl);
775  
    state_->socket_ptrs_.erase(epoll_impl);
866  
}
776  
}
867  

777  

868  
inline std::error_code
778  
inline std::error_code
869 -
epoll_socket_service::open_socket(tcp_socket::implementation& impl)
779 +
epoll_socket_service::open_socket(
 
780 +
    tcp_socket::implementation& impl,
 
781 +
    int family, int type, int protocol)
870  
{
782  
{
871  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
783  
    auto* epoll_impl = static_cast<epoll_socket*>(&impl);
872  
    epoll_impl->close_socket();
784  
    epoll_impl->close_socket();
873  

785  

874 -
    int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
786 +
    int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
875  
    if (fd < 0)
787  
    if (fd < 0)
876  
        return make_err(errno);
788  
        return make_err(errno);
 
789 +

 
790 +
    if (family == AF_INET6)
 
791 +
    {
 
792 +
        int one = 1;
 
793 +
        ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
 
794 +
    }
877  

795  

878  
    epoll_impl->fd_ = fd;
796  
    epoll_impl->fd_ = fd;
879  

797  

880  
    // Register fd with epoll (edge-triggered mode)
798  
    // Register fd with epoll (edge-triggered mode)
881  
    epoll_impl->desc_state_.fd = fd;
799  
    epoll_impl->desc_state_.fd = fd;
882  
    {
800  
    {
883  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
801  
        std::lock_guard lock(epoll_impl->desc_state_.mutex);
884  
        epoll_impl->desc_state_.read_op    = nullptr;
802  
        epoll_impl->desc_state_.read_op    = nullptr;
885  
        epoll_impl->desc_state_.write_op   = nullptr;
803  
        epoll_impl->desc_state_.write_op   = nullptr;
886  
        epoll_impl->desc_state_.connect_op = nullptr;
804  
        epoll_impl->desc_state_.connect_op = nullptr;
887  
    }
805  
    }
888  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
806  
    scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
889  

807  

890  
    return {};
808  
    return {};
891  
}
809  
}
892  

810  

893  
inline void
811  
inline void
894  
epoll_socket_service::close(io_object::handle& h)
812  
epoll_socket_service::close(io_object::handle& h)
895  
{
813  
{
896  
    static_cast<epoll_socket*>(h.get())->close_socket();
814  
    static_cast<epoll_socket*>(h.get())->close_socket();
897  
}
815  
}
898  

816  

899  
inline void
817  
inline void
900  
epoll_socket_service::post(epoll_op* op)
818  
epoll_socket_service::post(epoll_op* op)
901  
{
819  
{
902  
    state_->sched_.post(op);
820  
    state_->sched_.post(op);
903  
}
821  
}
904  

822  

905  
inline void
823  
inline void
906  
epoll_socket_service::work_started() noexcept
824  
epoll_socket_service::work_started() noexcept
907  
{
825  
{
908  
    state_->sched_.work_started();
826  
    state_->sched_.work_started();
909  
}
827  
}
910  

828  

911  
inline void
829  
inline void
912  
epoll_socket_service::work_finished() noexcept
830  
epoll_socket_service::work_finished() noexcept
913  
{
831  
{
914  
    state_->sched_.work_finished();
832  
    state_->sched_.work_finished();
915  
}
833  
}
916  

834  

917  
} // namespace boost::corosio::detail
835  
} // namespace boost::corosio::detail
918  

836  

919  
#endif // BOOST_COROSIO_HAS_EPOLL
837  
#endif // BOOST_COROSIO_HAS_EPOLL
920  

838  

921  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
839  
#endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP