TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 : #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 : #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24 :
25 : #include <boost/corosio/detail/endpoint_convert.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/make_err.hpp>
28 :
29 : #include <memory>
30 : #include <mutex>
31 : #include <unordered_map>
32 : #include <utility>
33 :
34 : #include <errno.h>
35 : #include <netinet/in.h>
36 : #include <sys/epoll.h>
37 : #include <sys/socket.h>
38 : #include <unistd.h>
39 :
40 : namespace boost::corosio::detail {
41 :
42 : /** State for epoll acceptor service. */
43 : class epoll_acceptor_state
44 : {
45 : public:
46 HIT 224 : explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 224 : : sched_(sched)
48 : {
49 224 : }
50 :
51 : epoll_scheduler& sched_;
52 : std::mutex mutex_;
53 : intrusive_list<epoll_acceptor> acceptor_list_;
54 : std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 : acceptor_ptrs_;
56 : };
57 :
58 : /** epoll acceptor service implementation.
59 :
60 : Inherits from acceptor_service to enable runtime polymorphism.
61 : Uses key_type = acceptor_service for service lookup.
62 : */
63 : class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 : {
65 : public:
66 : explicit epoll_acceptor_service(capy::execution_context& ctx);
67 : ~epoll_acceptor_service() override;
68 :
69 : epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 : epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71 :
72 : void shutdown() override;
73 :
74 : io_object::implementation* construct() override;
75 : void destroy(io_object::implementation*) override;
76 : void close(io_object::handle&) override;
77 : std::error_code open_acceptor_socket(
78 : tcp_acceptor::implementation& impl,
79 : int family, int type, int protocol) override;
80 : std::error_code bind_acceptor(
81 : tcp_acceptor::implementation& impl, endpoint ep) override;
82 : std::error_code listen_acceptor(
83 : tcp_acceptor::implementation& impl, int backlog) override;
84 :
85 4781 : epoll_scheduler& scheduler() const noexcept
86 : {
87 4781 : return state_->sched_;
88 : }
89 : void post(epoll_op* op);
90 : void work_started() noexcept;
91 : void work_finished() noexcept;
92 :
93 : /** Get the socket service for creating peer sockets during accept. */
94 : epoll_socket_service* socket_service() const noexcept;
95 :
96 : private:
97 : capy::execution_context& ctx_;
98 : std::unique_ptr<epoll_acceptor_state> state_;
99 : };
100 :
101 : //--------------------------------------------------------------------------
102 : //
103 : // Implementation
104 : //
105 : //--------------------------------------------------------------------------
106 :
107 : inline void
108 6 : epoll_accept_op::cancel() noexcept
109 : {
110 6 : if (acceptor_impl_)
111 6 : acceptor_impl_->cancel_single_op(*this);
112 : else
113 MIS 0 : request_cancel();
114 HIT 6 : }
115 :
116 : inline void
117 4625 : epoll_accept_op::operator()()
118 : {
119 4625 : stop_cb.reset();
120 :
121 4625 : static_cast<epoll_acceptor*>(acceptor_impl_)
122 4625 : ->service()
123 4625 : .scheduler()
124 4625 : .reset_inline_budget();
125 :
126 4625 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
127 :
128 4625 : if (cancelled.load(std::memory_order_acquire))
129 9 : *ec_out = capy::error::canceled;
130 4616 : else if (errn != 0)
131 MIS 0 : *ec_out = make_err(errn);
132 : else
133 HIT 4616 : *ec_out = {};
134 :
135 : // Set up the peer socket on success
136 4625 : if (success && accepted_fd >= 0 && acceptor_impl_)
137 : {
138 4616 : auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
139 4616 : ->service()
140 4616 : .socket_service();
141 4616 : if (socket_svc)
142 : {
143 4616 : auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
144 4616 : impl.set_socket(accepted_fd);
145 :
146 4616 : impl.desc_state_.fd = accepted_fd;
147 : {
148 4616 : std::lock_guard lock(impl.desc_state_.mutex);
149 4616 : impl.desc_state_.read_op = nullptr;
150 4616 : impl.desc_state_.write_op = nullptr;
151 4616 : impl.desc_state_.connect_op = nullptr;
152 4616 : }
153 4616 : socket_svc->scheduler().register_descriptor(
154 : accepted_fd, &impl.desc_state_);
155 :
156 4616 : impl.set_endpoints(
157 4616 : static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
158 4616 : from_sockaddr(peer_storage));
159 :
160 4616 : if (impl_out)
161 4616 : *impl_out = &impl;
162 4616 : accepted_fd = -1;
163 : }
164 : else
165 : {
166 : // No socket service — treat as error
167 MIS 0 : *ec_out = make_err(ENOENT);
168 0 : success = false;
169 : }
170 : }
171 :
172 HIT 4625 : if (!success || !acceptor_impl_)
173 : {
174 9 : if (accepted_fd >= 0)
175 : {
176 MIS 0 : ::close(accepted_fd);
177 0 : accepted_fd = -1;
178 : }
179 HIT 9 : if (impl_out)
180 9 : *impl_out = nullptr;
181 : }
182 :
183 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
184 4625 : capy::executor_ref saved_ex(ex);
185 4625 : std::coroutine_handle<> saved_h(h);
186 4625 : auto prevent_premature_destruction = std::move(impl_ptr);
187 4625 : dispatch_coro(saved_ex, saved_h).resume();
188 4625 : }
189 :
190 82 : inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
191 82 : : svc_(svc)
192 : {
193 82 : }
194 :
195 : inline std::coroutine_handle<>
196 4625 : epoll_acceptor::accept(
197 : std::coroutine_handle<> h,
198 : capy::executor_ref ex,
199 : std::stop_token token,
200 : std::error_code* ec,
201 : io_object::implementation** impl_out)
202 : {
203 4625 : auto& op = acc_;
204 4625 : op.reset();
205 4625 : op.h = h;
206 4625 : op.ex = ex;
207 4625 : op.ec_out = ec;
208 4625 : op.impl_out = impl_out;
209 4625 : op.fd = fd_;
210 4625 : op.start(token, this);
211 :
212 4625 : sockaddr_storage peer_storage{};
213 4625 : socklen_t addrlen = sizeof(peer_storage);
214 : int accepted;
215 : do
216 : {
217 4625 : accepted = ::accept4(
218 : fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
219 : SOCK_NONBLOCK | SOCK_CLOEXEC);
220 : }
221 4625 : while (accepted < 0 && errno == EINTR);
222 :
223 4625 : if (accepted >= 0)
224 : {
225 : {
226 2 : std::lock_guard lock(desc_state_.mutex);
227 2 : desc_state_.read_ready = false;
228 2 : }
229 :
230 2 : if (svc_.scheduler().try_consume_inline_budget())
231 : {
232 MIS 0 : auto* socket_svc = svc_.socket_service();
233 0 : if (socket_svc)
234 : {
235 : auto& impl =
236 0 : static_cast<epoll_socket&>(*socket_svc->construct());
237 0 : impl.set_socket(accepted);
238 :
239 0 : impl.desc_state_.fd = accepted;
240 : {
241 0 : std::lock_guard lock(impl.desc_state_.mutex);
242 0 : impl.desc_state_.read_op = nullptr;
243 0 : impl.desc_state_.write_op = nullptr;
244 0 : impl.desc_state_.connect_op = nullptr;
245 0 : }
246 0 : socket_svc->scheduler().register_descriptor(
247 : accepted, &impl.desc_state_);
248 :
249 0 : impl.set_endpoints(
250 : local_endpoint_, from_sockaddr(peer_storage));
251 :
252 0 : *ec = {};
253 0 : if (impl_out)
254 0 : *impl_out = &impl;
255 : }
256 : else
257 : {
258 0 : ::close(accepted);
259 0 : *ec = make_err(ENOENT);
260 0 : if (impl_out)
261 0 : *impl_out = nullptr;
262 : }
263 0 : return dispatch_coro(ex, h);
264 : }
265 :
266 HIT 2 : op.accepted_fd = accepted;
267 2 : op.peer_storage = peer_storage;
268 2 : op.complete(0, 0);
269 2 : op.impl_ptr = shared_from_this();
270 2 : svc_.post(&op);
271 2 : return std::noop_coroutine();
272 : }
273 :
274 4623 : if (errno == EAGAIN || errno == EWOULDBLOCK)
275 : {
276 4623 : op.impl_ptr = shared_from_this();
277 4623 : svc_.work_started();
278 :
279 4623 : std::lock_guard lock(desc_state_.mutex);
280 4623 : bool io_done = false;
281 4623 : if (desc_state_.read_ready)
282 : {
283 MIS 0 : desc_state_.read_ready = false;
284 0 : op.perform_io();
285 0 : io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
286 0 : if (!io_done)
287 0 : op.errn = 0;
288 : }
289 :
290 HIT 4623 : if (io_done || op.cancelled.load(std::memory_order_acquire))
291 : {
292 MIS 0 : svc_.post(&op);
293 0 : svc_.work_finished();
294 : }
295 : else
296 : {
297 HIT 4623 : desc_state_.read_op = &op;
298 : }
299 4623 : return std::noop_coroutine();
300 4623 : }
301 :
302 MIS 0 : op.complete(errno, 0);
303 0 : op.impl_ptr = shared_from_this();
304 0 : svc_.post(&op);
305 : // completion is always posted to scheduler queue, never inline.
306 0 : return std::noop_coroutine();
307 : }
308 :
309 : inline void
310 HIT 2 : epoll_acceptor::cancel() noexcept
311 : {
312 2 : cancel_single_op(acc_);
313 2 : }
314 :
315 : inline void
316 8 : epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
317 : {
318 8 : auto self = weak_from_this().lock();
319 8 : if (!self)
320 MIS 0 : return;
321 :
322 HIT 8 : op.request_cancel();
323 :
324 8 : epoll_op* claimed = nullptr;
325 : {
326 8 : std::lock_guard lock(desc_state_.mutex);
327 8 : if (desc_state_.read_op == &op)
328 7 : claimed = std::exchange(desc_state_.read_op, nullptr);
329 8 : }
330 8 : if (claimed)
331 : {
332 7 : op.impl_ptr = self;
333 7 : svc_.post(&op);
334 7 : svc_.work_finished();
335 : }
336 8 : }
337 :
338 : inline void
339 326 : epoll_acceptor::close_socket() noexcept
340 : {
341 326 : auto self = weak_from_this().lock();
342 326 : if (self)
343 : {
344 326 : acc_.request_cancel();
345 :
346 326 : epoll_op* claimed = nullptr;
347 : {
348 326 : std::lock_guard lock(desc_state_.mutex);
349 326 : claimed = std::exchange(desc_state_.read_op, nullptr);
350 326 : desc_state_.read_ready = false;
351 326 : desc_state_.write_ready = false;
352 326 : }
353 :
354 326 : if (claimed)
355 : {
356 2 : acc_.impl_ptr = self;
357 2 : svc_.post(&acc_);
358 2 : svc_.work_finished();
359 : }
360 :
361 326 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
362 MIS 0 : desc_state_.impl_ref_ = self;
363 : }
364 :
365 HIT 326 : if (fd_ >= 0)
366 : {
367 81 : if (desc_state_.registered_events != 0)
368 77 : svc_.scheduler().deregister_descriptor(fd_);
369 81 : ::close(fd_);
370 81 : fd_ = -1;
371 : }
372 :
373 326 : desc_state_.fd = -1;
374 326 : desc_state_.registered_events = 0;
375 :
376 326 : local_endpoint_ = endpoint{};
377 326 : }
378 :
379 224 : inline epoll_acceptor_service::epoll_acceptor_service(
380 224 : capy::execution_context& ctx)
381 224 : : ctx_(ctx)
382 224 : , state_(
383 : std::make_unique<epoll_acceptor_state>(
384 224 : ctx.use_service<epoll_scheduler>()))
385 : {
386 224 : }
387 :
388 448 : inline epoll_acceptor_service::~epoll_acceptor_service() {}
389 :
390 : inline void
391 224 : epoll_acceptor_service::shutdown()
392 : {
393 224 : std::lock_guard lock(state_->mutex_);
394 :
395 224 : while (auto* impl = state_->acceptor_list_.pop_front())
396 MIS 0 : impl->close_socket();
397 :
398 : // Don't clear acceptor_ptrs_ here — same rationale as
399 : // epoll_socket_service::shutdown(). Let ~state_ release ptrs
400 : // after scheduler shutdown has drained all queued ops.
401 HIT 224 : }
402 :
403 : inline io_object::implementation*
404 82 : epoll_acceptor_service::construct()
405 : {
406 82 : auto impl = std::make_shared<epoll_acceptor>(*this);
407 82 : auto* raw = impl.get();
408 :
409 82 : std::lock_guard lock(state_->mutex_);
410 82 : state_->acceptor_list_.push_back(raw);
411 82 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
412 :
413 82 : return raw;
414 82 : }
415 :
416 : inline void
417 82 : epoll_acceptor_service::destroy(io_object::implementation* impl)
418 : {
419 82 : auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
420 82 : epoll_impl->close_socket();
421 82 : std::lock_guard lock(state_->mutex_);
422 82 : state_->acceptor_list_.remove(epoll_impl);
423 82 : state_->acceptor_ptrs_.erase(epoll_impl);
424 82 : }
425 :
426 : inline void
427 163 : epoll_acceptor_service::close(io_object::handle& h)
428 : {
429 163 : static_cast<epoll_acceptor*>(h.get())->close_socket();
430 163 : }
431 :
432 : inline std::error_code
433 79 : epoll_acceptor::set_option(
434 : int level, int optname,
435 : void const* data, std::size_t size) noexcept
436 : {
437 79 : if (::setsockopt(fd_, level, optname, data,
438 79 : static_cast<socklen_t>(size)) != 0)
439 MIS 0 : return make_err(errno);
440 HIT 79 : return {};
441 : }
442 :
443 : inline std::error_code
444 MIS 0 : epoll_acceptor::get_option(
445 : int level, int optname,
446 : void* data, std::size_t* size) const noexcept
447 : {
448 0 : socklen_t len = static_cast<socklen_t>(*size);
449 0 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
450 0 : return make_err(errno);
451 0 : *size = static_cast<std::size_t>(len);
452 0 : return {};
453 : }
454 :
455 : inline std::error_code
456 HIT 81 : epoll_acceptor_service::open_acceptor_socket(
457 : tcp_acceptor::implementation& impl,
458 : int family, int type, int protocol)
459 : {
460 81 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
461 81 : epoll_impl->close_socket();
462 :
463 81 : int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
464 81 : if (fd < 0)
465 MIS 0 : return make_err(errno);
466 :
467 HIT 81 : if (family == AF_INET6)
468 : {
469 8 : int val = 0; // dual-stack default
470 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
471 : }
472 :
473 81 : epoll_impl->fd_ = fd;
474 :
475 : // Set up descriptor state but do NOT register with epoll yet
476 81 : epoll_impl->desc_state_.fd = fd;
477 : {
478 81 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
479 81 : epoll_impl->desc_state_.read_op = nullptr;
480 81 : }
481 :
482 81 : return {};
483 : }
484 :
485 : inline std::error_code
486 80 : epoll_acceptor_service::bind_acceptor(
487 : tcp_acceptor::implementation& impl, endpoint ep)
488 : {
489 80 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
490 80 : int fd = epoll_impl->fd_;
491 :
492 80 : sockaddr_storage storage{};
493 80 : socklen_t addrlen = detail::to_sockaddr(ep, storage);
494 80 : if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
495 3 : return make_err(errno);
496 :
497 : // Cache local endpoint (resolves ephemeral port)
498 77 : sockaddr_storage local{};
499 77 : socklen_t local_len = sizeof(local);
500 77 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
501 77 : epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
502 :
503 77 : return {};
504 : }
505 :
506 : inline std::error_code
507 77 : epoll_acceptor_service::listen_acceptor(
508 : tcp_acceptor::implementation& impl, int backlog)
509 : {
510 77 : auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
511 77 : int fd = epoll_impl->fd_;
512 :
513 77 : if (::listen(fd, backlog) < 0)
514 MIS 0 : return make_err(errno);
515 :
516 : // Register fd with epoll (edge-triggered mode)
517 HIT 77 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
518 :
519 77 : return {};
520 : }
521 :
522 : inline void
523 11 : epoll_acceptor_service::post(epoll_op* op)
524 : {
525 11 : state_->sched_.post(op);
526 11 : }
527 :
528 : inline void
529 4623 : epoll_acceptor_service::work_started() noexcept
530 : {
531 4623 : state_->sched_.work_started();
532 4623 : }
533 :
534 : inline void
535 9 : epoll_acceptor_service::work_finished() noexcept
536 : {
537 9 : state_->sched_.work_finished();
538 9 : }
539 :
540 : inline epoll_socket_service*
541 4616 : epoll_acceptor_service::socket_service() const noexcept
542 : {
543 4616 : auto* svc = ctx_.find_service<detail::socket_service>();
544 4616 : return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
545 : }
546 :
547 : } // namespace boost::corosio::detail
548 :
549 : #endif // BOOST_COROSIO_HAS_EPOLL
550 :
551 : #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
|