include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (337/416) 93.3% Functions (28/30)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 224 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 224 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code
128 open_socket(tcp_socket::implementation& impl,
129 int family, int type, int protocol) override;
130
131 480285 epoll_scheduler& scheduler() const noexcept
132 {
133 480285 return state_->sched_;
134 }
135 void post(epoll_op* op);
136 void work_started() noexcept;
137 void work_finished() noexcept;
138
139 private:
140 std::unique_ptr<epoll_socket_state> state_;
141 };
142
143 //--------------------------------------------------------------------------
144 //
145 // Implementation
146 //
147 //--------------------------------------------------------------------------
148
149 // Register an op with the reactor, handling cached edge events.
150 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 inline void
152 4820 epoll_socket::register_op(
153 epoll_op& op,
154 epoll_op*& desc_slot,
155 bool& ready_flag,
156 bool& cancel_flag) noexcept
157 {
158 4820 svc_.work_started();
159
160 4820 std::lock_guard lock(desc_state_.mutex);
161 4820 bool io_done = false;
162 4820 if (ready_flag)
163 {
164 142 ready_flag = false;
165 142 op.perform_io();
166 142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 142 if (!io_done)
168 142 op.errn = 0;
169 }
170
171 4820 if (cancel_flag)
172 {
173 95 cancel_flag = false;
174 95 op.cancelled.store(true, std::memory_order_relaxed);
175 }
176
177 4820 if (io_done || op.cancelled.load(std::memory_order_acquire))
178 {
179 95 svc_.post(&op);
180 95 svc_.work_finished();
181 }
182 else
183 {
184 4725 desc_slot = &op;
185 }
186 4820 }
187
188 inline void
189 105 epoll_op::canceller::operator()() const noexcept
190 {
191 105 op->cancel();
192 105 }
193
194 inline void
195 epoll_connect_op::cancel() noexcept
196 {
197 if (socket_impl_)
198 socket_impl_->cancel_single_op(*this);
199 else
200 request_cancel();
201 }
202
203 inline void
204 99 epoll_read_op::cancel() noexcept
205 {
206 99 if (socket_impl_)
207 99 socket_impl_->cancel_single_op(*this);
208 else
209 request_cancel();
210 99 }
211
212 inline void
213 epoll_write_op::cancel() noexcept
214 {
215 if (socket_impl_)
216 socket_impl_->cancel_single_op(*this);
217 else
218 request_cancel();
219 }
220
221 inline void
222 76303 epoll_op::operator()()
223 {
224 76303 stop_cb.reset();
225
226 76303 socket_impl_->svc_.scheduler().reset_inline_budget();
227
228 76303 if (cancelled.load(std::memory_order_acquire))
229 206 *ec_out = capy::error::canceled;
230 76097 else if (errn != 0)
231 *ec_out = make_err(errn);
232 76097 else if (is_read_operation() && bytes_transferred == 0)
233 *ec_out = capy::error::eof;
234 else
235 76097 *ec_out = {};
236
237 76303 *bytes_out = bytes_transferred;
238
239 // Move to stack before resuming coroutine. The coroutine might close
240 // the socket, releasing the last wrapper ref. If impl_ptr were the
241 // last ref and we destroyed it while still in operator(), we'd have
242 // use-after-free. Moving to local ensures destruction happens at
243 // function exit, after all member accesses are complete.
244 76303 capy::executor_ref saved_ex(ex);
245 76303 std::coroutine_handle<> saved_h(h);
246 76303 auto prevent_premature_destruction = std::move(impl_ptr);
247 76303 dispatch_coro(saved_ex, saved_h).resume();
248 76303 }
249
250 inline void
251 4618 epoll_connect_op::operator()()
252 {
253 4618 stop_cb.reset();
254
255 4618 socket_impl_->svc_.scheduler().reset_inline_budget();
256
257 4618 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258
259 // Cache endpoints on successful connect
260 4618 if (success && socket_impl_)
261 {
262 4616 endpoint local_ep;
263 4616 sockaddr_storage local_storage{};
264 4616 socklen_t local_len = sizeof(local_storage);
265 4616 if (::getsockname(
266 fd, reinterpret_cast<sockaddr*>(&local_storage),
267 4616 &local_len) == 0)
268 4616 local_ep = from_sockaddr(local_storage);
269 4616 static_cast<epoll_socket*>(socket_impl_)
270 4616 ->set_endpoints(local_ep, target_endpoint);
271 }
272
273 4618 if (cancelled.load(std::memory_order_acquire))
274 *ec_out = capy::error::canceled;
275 4618 else if (errn != 0)
276 2 *ec_out = make_err(errn);
277 else
278 4616 *ec_out = {};
279
280 // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 4618 capy::executor_ref saved_ex(ex);
282 4618 std::coroutine_handle<> saved_h(h);
283 4618 auto prevent_premature_destruction = std::move(impl_ptr);
284 4618 dispatch_coro(saved_ex, saved_h).resume();
285 4618 }
286
287 13911 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 13911 : svc_(svc)
289 {
290 13911 }
291
292 13911 inline epoll_socket::~epoll_socket() = default;
293
294 inline std::coroutine_handle<>
295 4618 epoll_socket::connect(
296 std::coroutine_handle<> h,
297 capy::executor_ref ex,
298 endpoint ep,
299 std::stop_token token,
300 std::error_code* ec)
301 {
302 4618 auto& op = conn_;
303
304 4618 sockaddr_storage storage{};
305 socklen_t addrlen =
306 4618 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 int result =
308 4618 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309
310 4618 if (result == 0)
311 {
312 sockaddr_storage local_storage{};
313 socklen_t local_len = sizeof(local_storage);
314 if (::getsockname(
315 fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 &local_len) == 0)
317 local_endpoint_ = detail::from_sockaddr(local_storage);
318 remote_endpoint_ = ep;
319 }
320
321 4618 if (result == 0 || errno != EINPROGRESS)
322 {
323 int err = (result < 0) ? errno : 0;
324 if (svc_.scheduler().try_consume_inline_budget())
325 {
326 *ec = err ? make_err(err) : std::error_code{};
327 return dispatch_coro(ex, h);
328 }
329 op.reset();
330 op.h = h;
331 op.ex = ex;
332 op.ec_out = ec;
333 op.fd = fd_;
334 op.target_endpoint = ep;
335 op.start(token, this);
336 op.impl_ptr = shared_from_this();
337 op.complete(err, 0);
338 svc_.post(&op);
339 return std::noop_coroutine();
340 }
341
342 // EINPROGRESS — register with reactor
343 4618 op.reset();
344 4618 op.h = h;
345 4618 op.ex = ex;
346 4618 op.ec_out = ec;
347 4618 op.fd = fd_;
348 4618 op.target_endpoint = ep;
349 4618 op.start(token, this);
350 4618 op.impl_ptr = shared_from_this();
351
352 4618 register_op(
353 4618 op, desc_state_.connect_op, desc_state_.write_ready,
354 4618 desc_state_.connect_cancel_pending);
355 4618 return std::noop_coroutine();
356 }
357
358 inline std::coroutine_handle<>
359 190635 epoll_socket::read_some(
360 std::coroutine_handle<> h,
361 capy::executor_ref ex,
362 io_buffer_param param,
363 std::stop_token token,
364 std::error_code* ec,
365 std::size_t* bytes_out)
366 {
367 190635 auto& op = rd_;
368 190635 op.reset();
369
370 190635 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 190635 op.iovec_count =
372 190635 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373
374 190635 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 {
376 1 op.empty_buffer_read = true;
377 1 op.h = h;
378 1 op.ex = ex;
379 1 op.ec_out = ec;
380 1 op.bytes_out = bytes_out;
381 1 op.start(token, this);
382 1 op.impl_ptr = shared_from_this();
383 1 op.complete(0, 0);
384 1 svc_.post(&op);
385 1 return std::noop_coroutine();
386 }
387
388 381268 for (int i = 0; i < op.iovec_count; ++i)
389 {
390 190634 op.iovecs[i].iov_base = bufs[i].data();
391 190634 op.iovecs[i].iov_len = bufs[i].size();
392 }
393
394 // Speculative read
395 ssize_t n;
396 do
397 {
398 190634 n = ::readv(fd_, op.iovecs, op.iovec_count);
399 }
400 190634 while (n < 0 && errno == EINTR);
401
402 190634 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 {
404 190432 int err = (n < 0) ? errno : 0;
405 190432 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406
407 190432 if (svc_.scheduler().try_consume_inline_budget())
408 {
409 152392 if (err)
410 *ec = make_err(err);
411 152392 else if (n == 0)
412 5 *ec = capy::error::eof;
413 else
414 152387 *ec = {};
415 152392 *bytes_out = bytes;
416 152392 return dispatch_coro(ex, h);
417 }
418 38040 op.h = h;
419 38040 op.ex = ex;
420 38040 op.ec_out = ec;
421 38040 op.bytes_out = bytes_out;
422 38040 op.start(token, this);
423 38040 op.impl_ptr = shared_from_this();
424 38040 op.complete(err, bytes);
425 38040 svc_.post(&op);
426 38040 return std::noop_coroutine();
427 }
428
429 // EAGAIN — register with reactor
430 202 op.h = h;
431 202 op.ex = ex;
432 202 op.ec_out = ec;
433 202 op.bytes_out = bytes_out;
434 202 op.fd = fd_;
435 202 op.start(token, this);
436 202 op.impl_ptr = shared_from_this();
437
438 202 register_op(
439 202 op, desc_state_.read_op, desc_state_.read_ready,
440 202 desc_state_.read_cancel_pending);
441 202 return std::noop_coroutine();
442 }
443
444 inline std::coroutine_handle<>
445 190435 epoll_socket::write_some(
446 std::coroutine_handle<> h,
447 capy::executor_ref ex,
448 io_buffer_param param,
449 std::stop_token token,
450 std::error_code* ec,
451 std::size_t* bytes_out)
452 {
453 190435 auto& op = wr_;
454 190435 op.reset();
455
456 190435 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 190435 op.iovec_count =
458 190435 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459
460 190435 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 {
462 1 op.h = h;
463 1 op.ex = ex;
464 1 op.ec_out = ec;
465 1 op.bytes_out = bytes_out;
466 1 op.start(token, this);
467 1 op.impl_ptr = shared_from_this();
468 1 op.complete(0, 0);
469 1 svc_.post(&op);
470 1 return std::noop_coroutine();
471 }
472
473 380868 for (int i = 0; i < op.iovec_count; ++i)
474 {
475 190434 op.iovecs[i].iov_base = bufs[i].data();
476 190434 op.iovecs[i].iov_len = bufs[i].size();
477 }
478
479 // Speculative write
480 190434 msghdr msg{};
481 190434 msg.msg_iov = op.iovecs;
482 190434 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483
484 ssize_t n;
485 do
486 {
487 190434 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 }
489 190434 while (n < 0 && errno == EINTR);
490
491 190434 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 {
493 190434 int err = (n < 0) ? errno : 0;
494 190434 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495
496 190434 if (svc_.scheduler().try_consume_inline_budget())
497 {
498 152375 *ec = err ? make_err(err) : std::error_code{};
499 152375 *bytes_out = bytes;
500 152375 return dispatch_coro(ex, h);
501 }
502 38059 op.h = h;
503 38059 op.ex = ex;
504 38059 op.ec_out = ec;
505 38059 op.bytes_out = bytes_out;
506 38059 op.start(token, this);
507 38059 op.impl_ptr = shared_from_this();
508 38059 op.complete(err, bytes);
509 38059 svc_.post(&op);
510 38059 return std::noop_coroutine();
511 }
512
513 // EAGAIN — register with reactor
514 op.h = h;
515 op.ex = ex;
516 op.ec_out = ec;
517 op.bytes_out = bytes_out;
518 op.fd = fd_;
519 op.start(token, this);
520 op.impl_ptr = shared_from_this();
521
522 register_op(
523 op, desc_state_.write_op, desc_state_.write_ready,
524 desc_state_.write_cancel_pending);
525 return std::noop_coroutine();
526 }
527
528 inline std::error_code
529 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 {
531 int how;
532 3 switch (what)
533 {
534 1 case tcp_socket::shutdown_receive:
535 1 how = SHUT_RD;
536 1 break;
537 1 case tcp_socket::shutdown_send:
538 1 how = SHUT_WR;
539 1 break;
540 1 case tcp_socket::shutdown_both:
541 1 how = SHUT_RDWR;
542 1 break;
543 default:
544 return make_err(EINVAL);
545 }
546 3 if (::shutdown(fd_, how) != 0)
547 return make_err(errno);
548 3 return {};
549 }
550
551 inline std::error_code
552 32 epoll_socket::set_option(
553 int level, int optname,
554 void const* data, std::size_t size) noexcept
555 {
556 32 if (::setsockopt(fd_, level, optname, data,
557 32 static_cast<socklen_t>(size)) != 0)
558 return make_err(errno);
559 32 return {};
560 }
561
562 inline std::error_code
563 31 epoll_socket::get_option(
564 int level, int optname,
565 void* data, std::size_t* size) const noexcept
566 {
567 31 socklen_t len = static_cast<socklen_t>(*size);
568 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 return make_err(errno);
570 31 *size = static_cast<std::size_t>(len);
571 31 return {};
572 }
573
574 inline void
575 187 epoll_socket::cancel() noexcept
576 {
577 187 auto self = weak_from_this().lock();
578 187 if (!self)
579 return;
580
581 187 conn_.request_cancel();
582 187 rd_.request_cancel();
583 187 wr_.request_cancel();
584
585 187 epoll_op* conn_claimed = nullptr;
586 187 epoll_op* rd_claimed = nullptr;
587 187 epoll_op* wr_claimed = nullptr;
588 {
589 187 std::lock_guard lock(desc_state_.mutex);
590 187 if (desc_state_.connect_op == &conn_)
591 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 else
593 187 desc_state_.connect_cancel_pending = true;
594 187 if (desc_state_.read_op == &rd_)
595 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 else
597 184 desc_state_.read_cancel_pending = true;
598 187 if (desc_state_.write_op == &wr_)
599 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 else
601 187 desc_state_.write_cancel_pending = true;
602 187 }
603
604 187 if (conn_claimed)
605 {
606 conn_.impl_ptr = self;
607 svc_.post(&conn_);
608 svc_.work_finished();
609 }
610 187 if (rd_claimed)
611 {
612 3 rd_.impl_ptr = self;
613 3 svc_.post(&rd_);
614 3 svc_.work_finished();
615 }
616 187 if (wr_claimed)
617 {
618 wr_.impl_ptr = self;
619 svc_.post(&wr_);
620 svc_.work_finished();
621 }
622 187 }
623
624 inline void
625 99 epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 {
627 99 auto self = weak_from_this().lock();
628 99 if (!self)
629 return;
630
631 99 op.request_cancel();
632
633 99 epoll_op** desc_op_ptr = nullptr;
634 99 if (&op == &conn_)
635 desc_op_ptr = &desc_state_.connect_op;
636 99 else if (&op == &rd_)
637 99 desc_op_ptr = &desc_state_.read_op;
638 else if (&op == &wr_)
639 desc_op_ptr = &desc_state_.write_op;
640
641 99 if (desc_op_ptr)
642 {
643 99 epoll_op* claimed = nullptr;
644 {
645 99 std::lock_guard lock(desc_state_.mutex);
646 99 if (*desc_op_ptr == &op)
647 99 claimed = std::exchange(*desc_op_ptr, nullptr);
648 else if (&op == &conn_)
649 desc_state_.connect_cancel_pending = true;
650 else if (&op == &rd_)
651 desc_state_.read_cancel_pending = true;
652 else if (&op == &wr_)
653 desc_state_.write_cancel_pending = true;
654 99 }
655 99 if (claimed)
656 {
657 99 op.impl_ptr = self;
658 99 svc_.post(&op);
659 99 svc_.work_finished();
660 }
661 }
662 99 }
663
664 inline void
665 41704 epoll_socket::close_socket() noexcept
666 {
667 41704 auto self = weak_from_this().lock();
668 41704 if (self)
669 {
670 41704 conn_.request_cancel();
671 41704 rd_.request_cancel();
672 41704 wr_.request_cancel();
673
674 41704 epoll_op* conn_claimed = nullptr;
675 41704 epoll_op* rd_claimed = nullptr;
676 41704 epoll_op* wr_claimed = nullptr;
677 {
678 41704 std::lock_guard lock(desc_state_.mutex);
679 41704 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 41704 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 41704 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 41704 desc_state_.read_ready = false;
683 41704 desc_state_.write_ready = false;
684 41704 desc_state_.read_cancel_pending = false;
685 41704 desc_state_.write_cancel_pending = false;
686 41704 desc_state_.connect_cancel_pending = false;
687 41704 }
688
689 41704 if (conn_claimed)
690 {
691 conn_.impl_ptr = self;
692 svc_.post(&conn_);
693 svc_.work_finished();
694 }
695 41704 if (rd_claimed)
696 {
697 1 rd_.impl_ptr = self;
698 1 svc_.post(&rd_);
699 1 svc_.work_finished();
700 }
701 41704 if (wr_claimed)
702 {
703 wr_.impl_ptr = self;
704 svc_.post(&wr_);
705 svc_.work_finished();
706 }
707
708 41704 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 desc_state_.impl_ref_ = self;
710 }
711
712 41704 if (fd_ >= 0)
713 {
714 9249 if (desc_state_.registered_events != 0)
715 9249 svc_.scheduler().deregister_descriptor(fd_);
716 9249 ::close(fd_);
717 9249 fd_ = -1;
718 }
719
720 41704 desc_state_.fd = -1;
721 41704 desc_state_.registered_events = 0;
722
723 41704 local_endpoint_ = endpoint{};
724 41704 remote_endpoint_ = endpoint{};
725 41704 }
726
727 224 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 224 : state_(
729 std::make_unique<epoll_socket_state>(
730 224 ctx.use_service<epoll_scheduler>()))
731 {
732 224 }
733
734 448 inline epoll_socket_service::~epoll_socket_service() {}
735
736 inline void
737 224 epoll_socket_service::shutdown()
738 {
739 224 std::lock_guard lock(state_->mutex_);
740
741 224 while (auto* impl = state_->socket_list_.pop_front())
742 impl->close_socket();
743
744 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 // drains completed_ops_, calling destroy() on each queued op. If we
746 // released our shared_ptrs now, an epoll_op::destroy() could free the
747 // last ref to an impl whose embedded descriptor_state is still linked
748 // in the queue — use-after-free on the next pop(). Letting ~state_
749 // release the ptrs (during service destruction, after scheduler
750 // shutdown) keeps every impl alive until all ops have been drained.
751 224 }
752
753 inline io_object::implementation*
754 13911 epoll_socket_service::construct()
755 {
756 13911 auto impl = std::make_shared<epoll_socket>(*this);
757 13911 auto* raw = impl.get();
758
759 {
760 13911 std::lock_guard lock(state_->mutex_);
761 13911 state_->socket_list_.push_back(raw);
762 13911 state_->socket_ptrs_.emplace(raw, std::move(impl));
763 13911 }
764
765 13911 return raw;
766 13911 }
767
768 inline void
769 13911 epoll_socket_service::destroy(io_object::implementation* impl)
770 {
771 13911 auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 13911 epoll_impl->close_socket();
773 13911 std::lock_guard lock(state_->mutex_);
774 13911 state_->socket_list_.remove(epoll_impl);
775 13911 state_->socket_ptrs_.erase(epoll_impl);
776 13911 }
777
778 inline std::error_code
779 4633 epoll_socket_service::open_socket(
780 tcp_socket::implementation& impl,
781 int family, int type, int protocol)
782 {
783 4633 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 4633 epoll_impl->close_socket();
785
786 4633 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 4633 if (fd < 0)
788 return make_err(errno);
789
790 4633 if (family == AF_INET6)
791 {
792 5 int one = 1;
793 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 }
795
796 4633 epoll_impl->fd_ = fd;
797
798 // Register fd with epoll (edge-triggered mode)
799 4633 epoll_impl->desc_state_.fd = fd;
800 {
801 4633 std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 4633 epoll_impl->desc_state_.read_op = nullptr;
803 4633 epoll_impl->desc_state_.write_op = nullptr;
804 4633 epoll_impl->desc_state_.connect_op = nullptr;
805 4633 }
806 4633 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807
808 4633 return {};
809 }
810
811 inline void
812 23160 epoll_socket_service::close(io_object::handle& h)
813 {
814 23160 static_cast<epoll_socket*>(h.get())->close_socket();
815 23160 }
816
817 inline void
818 76299 epoll_socket_service::post(epoll_op* op)
819 {
820 76299 state_->sched_.post(op);
821 76299 }
822
823 inline void
824 4820 epoll_socket_service::work_started() noexcept
825 {
826 4820 state_->sched_.work_started();
827 4820 }
828
829 inline void
830 198 epoll_socket_service::work_finished() noexcept
831 {
832 198 state_->sched_.work_finished();
833 198 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_HAS_EPOLL
838
839 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
840