TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23 :
24 : #include <boost/corosio/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/make_err.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/except.hpp>
28 : #include <boost/capy/buffers.hpp>
29 :
30 : #include <coroutine>
31 : #include <mutex>
32 : #include <unordered_map>
33 : #include <utility>
34 :
35 : #include <errno.h>
36 : #include <netinet/in.h>
37 : #include <netinet/tcp.h>
38 : #include <sys/epoll.h>
39 : #include <sys/socket.h>
40 : #include <unistd.h>
41 :
42 : /*
43 : epoll Socket Implementation
44 : ===========================
45 :
46 : Each I/O operation follows the same pattern:
47 : 1. Try the syscall immediately (non-blocking socket)
48 : 2. If it succeeds or fails with a real error, post to completion queue
49 : 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50 :
51 : This "try first" approach avoids unnecessary epoll round-trips for
52 : operations that can complete immediately (common for small reads/writes
53 : on fast local connections).
54 :
55 : One-Shot Registration
56 : ---------------------
57 : We use one-shot epoll registration: each operation registers, waits for
58 : one event, then unregisters. This simplifies the state machine since we
59 : don't need to track whether an fd is currently registered or handle
60 : re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 : simplicity is worth it.
62 :
63 : Cancellation
64 : ------------
65 : See op.hpp for the completion/cancellation race handling via the
66 : `registered` atomic. cancel() must complete pending operations (post
67 : them with cancelled flag) so coroutines waiting on them can resume.
68 : close_socket() calls cancel() first to ensure this.
69 :
70 : Impl Lifetime with shared_ptr
71 : -----------------------------
72 : Socket impls use enable_shared_from_this. The service owns impls via
73 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 : removal. When a user calls close(), we call cancel() which posts pending
75 : ops to the scheduler.
76 :
77 : CRITICAL: The posted ops must keep the impl alive until they complete.
78 : Otherwise the scheduler would process a freed op (use-after-free). The
79 : cancel() method captures shared_from_this() into op.impl_ptr before
80 : posting. When the op completes, impl_ptr is cleared, allowing the impl
81 : to be destroyed if no other references exist.
82 :
83 : Service Ownership
84 : -----------------
85 : epoll_socket_service owns all socket impls. destroy_impl() removes the
86 : shared_ptr from the map, but the impl may survive if ops still hold
87 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 : in-flight ops will complete and release their refs.
89 : */
90 :
91 : namespace boost::corosio::detail {
92 :
93 : /** State for epoll socket service. */
94 : class epoll_socket_state
95 : {
96 : public:
97 HIT 224 : explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 : {
99 224 : }
100 :
101 : epoll_scheduler& sched_;
102 : std::mutex mutex_;
103 : intrusive_list<epoll_socket> socket_list_;
104 : std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 : socket_ptrs_;
106 : };
107 :
108 : /** epoll socket service implementation.
109 :
110 : Inherits from socket_service to enable runtime polymorphism.
111 : Uses key_type = socket_service for service lookup.
112 : */
113 : class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 : {
115 : public:
116 : explicit epoll_socket_service(capy::execution_context& ctx);
117 : ~epoll_socket_service() override;
118 :
119 : epoll_socket_service(epoll_socket_service const&) = delete;
120 : epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121 :
122 : void shutdown() override;
123 :
124 : io_object::implementation* construct() override;
125 : void destroy(io_object::implementation*) override;
126 : void close(io_object::handle&) override;
127 : std::error_code
128 : open_socket(tcp_socket::implementation& impl,
129 : int family, int type, int protocol) override;
130 :
131 480285 : epoll_scheduler& scheduler() const noexcept
132 : {
133 480285 : return state_->sched_;
134 : }
135 : void post(epoll_op* op);
136 : void work_started() noexcept;
137 : void work_finished() noexcept;
138 :
139 : private:
140 : std::unique_ptr<epoll_socket_state> state_;
141 : };
142 :
143 : //--------------------------------------------------------------------------
144 : //
145 : // Implementation
146 : //
147 : //--------------------------------------------------------------------------
148 :
149 : // Register an op with the reactor, handling cached edge events.
150 : // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 : inline void
152 4820 : epoll_socket::register_op(
153 : epoll_op& op,
154 : epoll_op*& desc_slot,
155 : bool& ready_flag,
156 : bool& cancel_flag) noexcept
157 : {
158 4820 : svc_.work_started();
159 :
160 4820 : std::lock_guard lock(desc_state_.mutex);
161 4820 : bool io_done = false;
162 4820 : if (ready_flag)
163 : {
164 142 : ready_flag = false;
165 142 : op.perform_io();
166 142 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 142 : if (!io_done)
168 142 : op.errn = 0;
169 : }
170 :
171 4820 : if (cancel_flag)
172 : {
173 95 : cancel_flag = false;
174 95 : op.cancelled.store(true, std::memory_order_relaxed);
175 : }
176 :
177 4820 : if (io_done || op.cancelled.load(std::memory_order_acquire))
178 : {
179 95 : svc_.post(&op);
180 95 : svc_.work_finished();
181 : }
182 : else
183 : {
184 4725 : desc_slot = &op;
185 : }
186 4820 : }
187 :
188 : inline void
189 105 : epoll_op::canceller::operator()() const noexcept
190 : {
191 105 : op->cancel();
192 105 : }
193 :
194 : inline void
195 MIS 0 : epoll_connect_op::cancel() noexcept
196 : {
197 0 : if (socket_impl_)
198 0 : socket_impl_->cancel_single_op(*this);
199 : else
200 0 : request_cancel();
201 0 : }
202 :
203 : inline void
204 HIT 99 : epoll_read_op::cancel() noexcept
205 : {
206 99 : if (socket_impl_)
207 99 : socket_impl_->cancel_single_op(*this);
208 : else
209 MIS 0 : request_cancel();
210 HIT 99 : }
211 :
212 : inline void
213 MIS 0 : epoll_write_op::cancel() noexcept
214 : {
215 0 : if (socket_impl_)
216 0 : socket_impl_->cancel_single_op(*this);
217 : else
218 0 : request_cancel();
219 0 : }
220 :
221 : inline void
222 HIT 76303 : epoll_op::operator()()
223 : {
224 76303 : stop_cb.reset();
225 :
226 76303 : socket_impl_->svc_.scheduler().reset_inline_budget();
227 :
228 76303 : if (cancelled.load(std::memory_order_acquire))
229 206 : *ec_out = capy::error::canceled;
230 76097 : else if (errn != 0)
231 MIS 0 : *ec_out = make_err(errn);
232 HIT 76097 : else if (is_read_operation() && bytes_transferred == 0)
233 MIS 0 : *ec_out = capy::error::eof;
234 : else
235 HIT 76097 : *ec_out = {};
236 :
237 76303 : *bytes_out = bytes_transferred;
238 :
239 : // Move to stack before resuming coroutine. The coroutine might close
240 : // the socket, releasing the last wrapper ref. If impl_ptr were the
241 : // last ref and we destroyed it while still in operator(), we'd have
242 : // use-after-free. Moving to local ensures destruction happens at
243 : // function exit, after all member accesses are complete.
244 76303 : capy::executor_ref saved_ex(ex);
245 76303 : std::coroutine_handle<> saved_h(h);
246 76303 : auto prevent_premature_destruction = std::move(impl_ptr);
247 76303 : dispatch_coro(saved_ex, saved_h).resume();
248 76303 : }
249 :
250 : inline void
251 4618 : epoll_connect_op::operator()()
252 : {
253 4618 : stop_cb.reset();
254 :
255 4618 : socket_impl_->svc_.scheduler().reset_inline_budget();
256 :
257 4618 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258 :
259 : // Cache endpoints on successful connect
260 4618 : if (success && socket_impl_)
261 : {
262 4616 : endpoint local_ep;
263 4616 : sockaddr_storage local_storage{};
264 4616 : socklen_t local_len = sizeof(local_storage);
265 4616 : if (::getsockname(
266 : fd, reinterpret_cast<sockaddr*>(&local_storage),
267 4616 : &local_len) == 0)
268 4616 : local_ep = from_sockaddr(local_storage);
269 4616 : static_cast<epoll_socket*>(socket_impl_)
270 4616 : ->set_endpoints(local_ep, target_endpoint);
271 : }
272 :
273 4618 : if (cancelled.load(std::memory_order_acquire))
274 MIS 0 : *ec_out = capy::error::canceled;
275 HIT 4618 : else if (errn != 0)
276 2 : *ec_out = make_err(errn);
277 : else
278 4616 : *ec_out = {};
279 :
280 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 4618 : capy::executor_ref saved_ex(ex);
282 4618 : std::coroutine_handle<> saved_h(h);
283 4618 : auto prevent_premature_destruction = std::move(impl_ptr);
284 4618 : dispatch_coro(saved_ex, saved_h).resume();
285 4618 : }
286 :
287 13911 : inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 13911 : : svc_(svc)
289 : {
290 13911 : }
291 :
292 13911 : inline epoll_socket::~epoll_socket() = default;
293 :
294 : inline std::coroutine_handle<>
295 4618 : epoll_socket::connect(
296 : std::coroutine_handle<> h,
297 : capy::executor_ref ex,
298 : endpoint ep,
299 : std::stop_token token,
300 : std::error_code* ec)
301 : {
302 4618 : auto& op = conn_;
303 :
304 4618 : sockaddr_storage storage{};
305 : socklen_t addrlen =
306 4618 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 : int result =
308 4618 : ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309 :
310 4618 : if (result == 0)
311 : {
312 MIS 0 : sockaddr_storage local_storage{};
313 0 : socklen_t local_len = sizeof(local_storage);
314 0 : if (::getsockname(
315 : fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 0 : &local_len) == 0)
317 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
318 0 : remote_endpoint_ = ep;
319 : }
320 :
321 HIT 4618 : if (result == 0 || errno != EINPROGRESS)
322 : {
323 MIS 0 : int err = (result < 0) ? errno : 0;
324 0 : if (svc_.scheduler().try_consume_inline_budget())
325 : {
326 0 : *ec = err ? make_err(err) : std::error_code{};
327 0 : return dispatch_coro(ex, h);
328 : }
329 0 : op.reset();
330 0 : op.h = h;
331 0 : op.ex = ex;
332 0 : op.ec_out = ec;
333 0 : op.fd = fd_;
334 0 : op.target_endpoint = ep;
335 0 : op.start(token, this);
336 0 : op.impl_ptr = shared_from_this();
337 0 : op.complete(err, 0);
338 0 : svc_.post(&op);
339 0 : return std::noop_coroutine();
340 : }
341 :
342 : // EINPROGRESS — register with reactor
343 HIT 4618 : op.reset();
344 4618 : op.h = h;
345 4618 : op.ex = ex;
346 4618 : op.ec_out = ec;
347 4618 : op.fd = fd_;
348 4618 : op.target_endpoint = ep;
349 4618 : op.start(token, this);
350 4618 : op.impl_ptr = shared_from_this();
351 :
352 4618 : register_op(
353 4618 : op, desc_state_.connect_op, desc_state_.write_ready,
354 4618 : desc_state_.connect_cancel_pending);
355 4618 : return std::noop_coroutine();
356 : }
357 :
358 : inline std::coroutine_handle<>
359 190635 : epoll_socket::read_some(
360 : std::coroutine_handle<> h,
361 : capy::executor_ref ex,
362 : io_buffer_param param,
363 : std::stop_token token,
364 : std::error_code* ec,
365 : std::size_t* bytes_out)
366 : {
367 190635 : auto& op = rd_;
368 190635 : op.reset();
369 :
370 190635 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 190635 : op.iovec_count =
372 190635 : static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373 :
374 190635 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 : {
376 1 : op.empty_buffer_read = true;
377 1 : op.h = h;
378 1 : op.ex = ex;
379 1 : op.ec_out = ec;
380 1 : op.bytes_out = bytes_out;
381 1 : op.start(token, this);
382 1 : op.impl_ptr = shared_from_this();
383 1 : op.complete(0, 0);
384 1 : svc_.post(&op);
385 1 : return std::noop_coroutine();
386 : }
387 :
388 381268 : for (int i = 0; i < op.iovec_count; ++i)
389 : {
390 190634 : op.iovecs[i].iov_base = bufs[i].data();
391 190634 : op.iovecs[i].iov_len = bufs[i].size();
392 : }
393 :
394 : // Speculative read
395 : ssize_t n;
396 : do
397 : {
398 190634 : n = ::readv(fd_, op.iovecs, op.iovec_count);
399 : }
400 190634 : while (n < 0 && errno == EINTR);
401 :
402 190634 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 : {
404 190432 : int err = (n < 0) ? errno : 0;
405 190432 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406 :
407 190432 : if (svc_.scheduler().try_consume_inline_budget())
408 : {
409 152392 : if (err)
410 MIS 0 : *ec = make_err(err);
411 HIT 152392 : else if (n == 0)
412 5 : *ec = capy::error::eof;
413 : else
414 152387 : *ec = {};
415 152392 : *bytes_out = bytes;
416 152392 : return dispatch_coro(ex, h);
417 : }
418 38040 : op.h = h;
419 38040 : op.ex = ex;
420 38040 : op.ec_out = ec;
421 38040 : op.bytes_out = bytes_out;
422 38040 : op.start(token, this);
423 38040 : op.impl_ptr = shared_from_this();
424 38040 : op.complete(err, bytes);
425 38040 : svc_.post(&op);
426 38040 : return std::noop_coroutine();
427 : }
428 :
429 : // EAGAIN — register with reactor
430 202 : op.h = h;
431 202 : op.ex = ex;
432 202 : op.ec_out = ec;
433 202 : op.bytes_out = bytes_out;
434 202 : op.fd = fd_;
435 202 : op.start(token, this);
436 202 : op.impl_ptr = shared_from_this();
437 :
438 202 : register_op(
439 202 : op, desc_state_.read_op, desc_state_.read_ready,
440 202 : desc_state_.read_cancel_pending);
441 202 : return std::noop_coroutine();
442 : }
443 :
444 : inline std::coroutine_handle<>
445 190435 : epoll_socket::write_some(
446 : std::coroutine_handle<> h,
447 : capy::executor_ref ex,
448 : io_buffer_param param,
449 : std::stop_token token,
450 : std::error_code* ec,
451 : std::size_t* bytes_out)
452 : {
453 190435 : auto& op = wr_;
454 190435 : op.reset();
455 :
456 190435 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 190435 : op.iovec_count =
458 190435 : static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459 :
460 190435 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 : {
462 1 : op.h = h;
463 1 : op.ex = ex;
464 1 : op.ec_out = ec;
465 1 : op.bytes_out = bytes_out;
466 1 : op.start(token, this);
467 1 : op.impl_ptr = shared_from_this();
468 1 : op.complete(0, 0);
469 1 : svc_.post(&op);
470 1 : return std::noop_coroutine();
471 : }
472 :
473 380868 : for (int i = 0; i < op.iovec_count; ++i)
474 : {
475 190434 : op.iovecs[i].iov_base = bufs[i].data();
476 190434 : op.iovecs[i].iov_len = bufs[i].size();
477 : }
478 :
479 : // Speculative write
480 190434 : msghdr msg{};
481 190434 : msg.msg_iov = op.iovecs;
482 190434 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483 :
484 : ssize_t n;
485 : do
486 : {
487 190434 : n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 : }
489 190434 : while (n < 0 && errno == EINTR);
490 :
491 190434 : if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 : {
493 190434 : int err = (n < 0) ? errno : 0;
494 190434 : auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495 :
496 190434 : if (svc_.scheduler().try_consume_inline_budget())
497 : {
498 152375 : *ec = err ? make_err(err) : std::error_code{};
499 152375 : *bytes_out = bytes;
500 152375 : return dispatch_coro(ex, h);
501 : }
502 38059 : op.h = h;
503 38059 : op.ex = ex;
504 38059 : op.ec_out = ec;
505 38059 : op.bytes_out = bytes_out;
506 38059 : op.start(token, this);
507 38059 : op.impl_ptr = shared_from_this();
508 38059 : op.complete(err, bytes);
509 38059 : svc_.post(&op);
510 38059 : return std::noop_coroutine();
511 : }
512 :
513 : // EAGAIN — register with reactor
514 MIS 0 : op.h = h;
515 0 : op.ex = ex;
516 0 : op.ec_out = ec;
517 0 : op.bytes_out = bytes_out;
518 0 : op.fd = fd_;
519 0 : op.start(token, this);
520 0 : op.impl_ptr = shared_from_this();
521 :
522 0 : register_op(
523 0 : op, desc_state_.write_op, desc_state_.write_ready,
524 0 : desc_state_.write_cancel_pending);
525 0 : return std::noop_coroutine();
526 : }
527 :
528 : inline std::error_code
529 HIT 3 : epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 : {
531 : int how;
532 3 : switch (what)
533 : {
534 1 : case tcp_socket::shutdown_receive:
535 1 : how = SHUT_RD;
536 1 : break;
537 1 : case tcp_socket::shutdown_send:
538 1 : how = SHUT_WR;
539 1 : break;
540 1 : case tcp_socket::shutdown_both:
541 1 : how = SHUT_RDWR;
542 1 : break;
543 MIS 0 : default:
544 0 : return make_err(EINVAL);
545 : }
546 HIT 3 : if (::shutdown(fd_, how) != 0)
547 MIS 0 : return make_err(errno);
548 HIT 3 : return {};
549 : }
550 :
551 : inline std::error_code
552 32 : epoll_socket::set_option(
553 : int level, int optname,
554 : void const* data, std::size_t size) noexcept
555 : {
556 32 : if (::setsockopt(fd_, level, optname, data,
557 32 : static_cast<socklen_t>(size)) != 0)
558 MIS 0 : return make_err(errno);
559 HIT 32 : return {};
560 : }
561 :
562 : inline std::error_code
563 31 : epoll_socket::get_option(
564 : int level, int optname,
565 : void* data, std::size_t* size) const noexcept
566 : {
567 31 : socklen_t len = static_cast<socklen_t>(*size);
568 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 MIS 0 : return make_err(errno);
570 HIT 31 : *size = static_cast<std::size_t>(len);
571 31 : return {};
572 : }
573 :
574 : inline void
575 187 : epoll_socket::cancel() noexcept
576 : {
577 187 : auto self = weak_from_this().lock();
578 187 : if (!self)
579 MIS 0 : return;
580 :
581 HIT 187 : conn_.request_cancel();
582 187 : rd_.request_cancel();
583 187 : wr_.request_cancel();
584 :
585 187 : epoll_op* conn_claimed = nullptr;
586 187 : epoll_op* rd_claimed = nullptr;
587 187 : epoll_op* wr_claimed = nullptr;
588 : {
589 187 : std::lock_guard lock(desc_state_.mutex);
590 187 : if (desc_state_.connect_op == &conn_)
591 MIS 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 : else
593 HIT 187 : desc_state_.connect_cancel_pending = true;
594 187 : if (desc_state_.read_op == &rd_)
595 3 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 : else
597 184 : desc_state_.read_cancel_pending = true;
598 187 : if (desc_state_.write_op == &wr_)
599 MIS 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 : else
601 HIT 187 : desc_state_.write_cancel_pending = true;
602 187 : }
603 :
604 187 : if (conn_claimed)
605 : {
606 MIS 0 : conn_.impl_ptr = self;
607 0 : svc_.post(&conn_);
608 0 : svc_.work_finished();
609 : }
610 HIT 187 : if (rd_claimed)
611 : {
612 3 : rd_.impl_ptr = self;
613 3 : svc_.post(&rd_);
614 3 : svc_.work_finished();
615 : }
616 187 : if (wr_claimed)
617 : {
618 MIS 0 : wr_.impl_ptr = self;
619 0 : svc_.post(&wr_);
620 0 : svc_.work_finished();
621 : }
622 HIT 187 : }
623 :
624 : inline void
625 99 : epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 : {
627 99 : auto self = weak_from_this().lock();
628 99 : if (!self)
629 MIS 0 : return;
630 :
631 HIT 99 : op.request_cancel();
632 :
633 99 : epoll_op** desc_op_ptr = nullptr;
634 99 : if (&op == &conn_)
635 MIS 0 : desc_op_ptr = &desc_state_.connect_op;
636 HIT 99 : else if (&op == &rd_)
637 99 : desc_op_ptr = &desc_state_.read_op;
638 MIS 0 : else if (&op == &wr_)
639 0 : desc_op_ptr = &desc_state_.write_op;
640 :
641 HIT 99 : if (desc_op_ptr)
642 : {
643 99 : epoll_op* claimed = nullptr;
644 : {
645 99 : std::lock_guard lock(desc_state_.mutex);
646 99 : if (*desc_op_ptr == &op)
647 99 : claimed = std::exchange(*desc_op_ptr, nullptr);
648 MIS 0 : else if (&op == &conn_)
649 0 : desc_state_.connect_cancel_pending = true;
650 0 : else if (&op == &rd_)
651 0 : desc_state_.read_cancel_pending = true;
652 0 : else if (&op == &wr_)
653 0 : desc_state_.write_cancel_pending = true;
654 HIT 99 : }
655 99 : if (claimed)
656 : {
657 99 : op.impl_ptr = self;
658 99 : svc_.post(&op);
659 99 : svc_.work_finished();
660 : }
661 : }
662 99 : }
663 :
664 : inline void
665 41704 : epoll_socket::close_socket() noexcept
666 : {
667 41704 : auto self = weak_from_this().lock();
668 41704 : if (self)
669 : {
670 41704 : conn_.request_cancel();
671 41704 : rd_.request_cancel();
672 41704 : wr_.request_cancel();
673 :
674 41704 : epoll_op* conn_claimed = nullptr;
675 41704 : epoll_op* rd_claimed = nullptr;
676 41704 : epoll_op* wr_claimed = nullptr;
677 : {
678 41704 : std::lock_guard lock(desc_state_.mutex);
679 41704 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 41704 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 41704 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 41704 : desc_state_.read_ready = false;
683 41704 : desc_state_.write_ready = false;
684 41704 : desc_state_.read_cancel_pending = false;
685 41704 : desc_state_.write_cancel_pending = false;
686 41704 : desc_state_.connect_cancel_pending = false;
687 41704 : }
688 :
689 41704 : if (conn_claimed)
690 : {
691 MIS 0 : conn_.impl_ptr = self;
692 0 : svc_.post(&conn_);
693 0 : svc_.work_finished();
694 : }
695 HIT 41704 : if (rd_claimed)
696 : {
697 1 : rd_.impl_ptr = self;
698 1 : svc_.post(&rd_);
699 1 : svc_.work_finished();
700 : }
701 41704 : if (wr_claimed)
702 : {
703 MIS 0 : wr_.impl_ptr = self;
704 0 : svc_.post(&wr_);
705 0 : svc_.work_finished();
706 : }
707 :
708 HIT 41704 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 : desc_state_.impl_ref_ = self;
710 : }
711 :
712 41704 : if (fd_ >= 0)
713 : {
714 9249 : if (desc_state_.registered_events != 0)
715 9249 : svc_.scheduler().deregister_descriptor(fd_);
716 9249 : ::close(fd_);
717 9249 : fd_ = -1;
718 : }
719 :
720 41704 : desc_state_.fd = -1;
721 41704 : desc_state_.registered_events = 0;
722 :
723 41704 : local_endpoint_ = endpoint{};
724 41704 : remote_endpoint_ = endpoint{};
725 41704 : }
726 :
727 224 : inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 224 : : state_(
729 : std::make_unique<epoll_socket_state>(
730 224 : ctx.use_service<epoll_scheduler>()))
731 : {
732 224 : }
733 :
734 448 : inline epoll_socket_service::~epoll_socket_service() {}
735 :
736 : inline void
737 224 : epoll_socket_service::shutdown()
738 : {
739 224 : std::lock_guard lock(state_->mutex_);
740 :
741 224 : while (auto* impl = state_->socket_list_.pop_front())
742 MIS 0 : impl->close_socket();
743 :
744 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 : // drains completed_ops_, calling destroy() on each queued op. If we
746 : // released our shared_ptrs now, an epoll_op::destroy() could free the
747 : // last ref to an impl whose embedded descriptor_state is still linked
748 : // in the queue — use-after-free on the next pop(). Letting ~state_
749 : // release the ptrs (during service destruction, after scheduler
750 : // shutdown) keeps every impl alive until all ops have been drained.
751 HIT 224 : }
752 :
753 : inline io_object::implementation*
754 13911 : epoll_socket_service::construct()
755 : {
756 13911 : auto impl = std::make_shared<epoll_socket>(*this);
757 13911 : auto* raw = impl.get();
758 :
759 : {
760 13911 : std::lock_guard lock(state_->mutex_);
761 13911 : state_->socket_list_.push_back(raw);
762 13911 : state_->socket_ptrs_.emplace(raw, std::move(impl));
763 13911 : }
764 :
765 13911 : return raw;
766 13911 : }
767 :
768 : inline void
769 13911 : epoll_socket_service::destroy(io_object::implementation* impl)
770 : {
771 13911 : auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 13911 : epoll_impl->close_socket();
773 13911 : std::lock_guard lock(state_->mutex_);
774 13911 : state_->socket_list_.remove(epoll_impl);
775 13911 : state_->socket_ptrs_.erase(epoll_impl);
776 13911 : }
777 :
778 : inline std::error_code
779 4633 : epoll_socket_service::open_socket(
780 : tcp_socket::implementation& impl,
781 : int family, int type, int protocol)
782 : {
783 4633 : auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 4633 : epoll_impl->close_socket();
785 :
786 4633 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 4633 : if (fd < 0)
788 MIS 0 : return make_err(errno);
789 :
790 HIT 4633 : if (family == AF_INET6)
791 : {
792 5 : int one = 1;
793 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 : }
795 :
796 4633 : epoll_impl->fd_ = fd;
797 :
798 : // Register fd with epoll (edge-triggered mode)
799 4633 : epoll_impl->desc_state_.fd = fd;
800 : {
801 4633 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 4633 : epoll_impl->desc_state_.read_op = nullptr;
803 4633 : epoll_impl->desc_state_.write_op = nullptr;
804 4633 : epoll_impl->desc_state_.connect_op = nullptr;
805 4633 : }
806 4633 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807 :
808 4633 : return {};
809 : }
810 :
811 : inline void
812 23160 : epoll_socket_service::close(io_object::handle& h)
813 : {
814 23160 : static_cast<epoll_socket*>(h.get())->close_socket();
815 23160 : }
816 :
817 : inline void
818 76299 : epoll_socket_service::post(epoll_op* op)
819 : {
820 76299 : state_->sched_.post(op);
821 76299 : }
822 :
823 : inline void
824 4820 : epoll_socket_service::work_started() noexcept
825 : {
826 4820 : state_->sched_.work_started();
827 4820 : }
828 :
829 : inline void
830 198 : epoll_socket_service::work_finished() noexcept
831 : {
832 198 : state_->sched_.work_finished();
833 198 : }
834 :
835 : } // namespace boost::corosio::detail
836 :
837 : #endif // BOOST_COROSIO_HAS_EPOLL
838 :
839 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
|