include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp

80.6% Lines (204/253) 95.8% Functions (23/24)
include/boost/corosio/native/detail/epoll/epoll_acceptor_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_acceptor.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_socket_service.hpp>
23 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
24
25 #include <boost/corosio/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/make_err.hpp>
28
29 #include <memory>
30 #include <mutex>
31 #include <unordered_map>
32 #include <utility>
33
34 #include <errno.h>
35 #include <netinet/in.h>
36 #include <sys/epoll.h>
37 #include <sys/socket.h>
38 #include <unistd.h>
39
40 namespace boost::corosio::detail {
41
42 /** State for epoll acceptor service. */
43 class epoll_acceptor_state
44 {
45 public:
46 224 explicit epoll_acceptor_state(epoll_scheduler& sched) noexcept
47 224 : sched_(sched)
48 {
49 224 }
50
51 epoll_scheduler& sched_;
52 std::mutex mutex_;
53 intrusive_list<epoll_acceptor> acceptor_list_;
54 std::unordered_map<epoll_acceptor*, std::shared_ptr<epoll_acceptor>>
55 acceptor_ptrs_;
56 };
57
58 /** epoll acceptor service implementation.
59
60 Inherits from acceptor_service to enable runtime polymorphism.
61 Uses key_type = acceptor_service for service lookup.
62 */
63 class BOOST_COROSIO_DECL epoll_acceptor_service final : public acceptor_service
64 {
65 public:
66 explicit epoll_acceptor_service(capy::execution_context& ctx);
67 ~epoll_acceptor_service() override;
68
69 epoll_acceptor_service(epoll_acceptor_service const&) = delete;
70 epoll_acceptor_service& operator=(epoll_acceptor_service const&) = delete;
71
72 void shutdown() override;
73
74 io_object::implementation* construct() override;
75 void destroy(io_object::implementation*) override;
76 void close(io_object::handle&) override;
77 std::error_code open_acceptor_socket(
78 tcp_acceptor::implementation& impl,
79 int family, int type, int protocol) override;
80 std::error_code bind_acceptor(
81 tcp_acceptor::implementation& impl, endpoint ep) override;
82 std::error_code listen_acceptor(
83 tcp_acceptor::implementation& impl, int backlog) override;
84
85 4781 epoll_scheduler& scheduler() const noexcept
86 {
87 4781 return state_->sched_;
88 }
89 void post(epoll_op* op);
90 void work_started() noexcept;
91 void work_finished() noexcept;
92
93 /** Get the socket service for creating peer sockets during accept. */
94 epoll_socket_service* socket_service() const noexcept;
95
96 private:
97 capy::execution_context& ctx_;
98 std::unique_ptr<epoll_acceptor_state> state_;
99 };
100
101 //--------------------------------------------------------------------------
102 //
103 // Implementation
104 //
105 //--------------------------------------------------------------------------
106
107 inline void
108 6 epoll_accept_op::cancel() noexcept
109 {
110 6 if (acceptor_impl_)
111 6 acceptor_impl_->cancel_single_op(*this);
112 else
113 request_cancel();
114 6 }
115
116 inline void
117 4625 epoll_accept_op::operator()()
118 {
119 4625 stop_cb.reset();
120
121 4625 static_cast<epoll_acceptor*>(acceptor_impl_)
122 4625 ->service()
123 4625 .scheduler()
124 4625 .reset_inline_budget();
125
126 4625 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
127
128 4625 if (cancelled.load(std::memory_order_acquire))
129 9 *ec_out = capy::error::canceled;
130 4616 else if (errn != 0)
131 *ec_out = make_err(errn);
132 else
133 4616 *ec_out = {};
134
135 // Set up the peer socket on success
136 4625 if (success && accepted_fd >= 0 && acceptor_impl_)
137 {
138 4616 auto* socket_svc = static_cast<epoll_acceptor*>(acceptor_impl_)
139 4616 ->service()
140 4616 .socket_service();
141 4616 if (socket_svc)
142 {
143 4616 auto& impl = static_cast<epoll_socket&>(*socket_svc->construct());
144 4616 impl.set_socket(accepted_fd);
145
146 4616 impl.desc_state_.fd = accepted_fd;
147 {
148 4616 std::lock_guard lock(impl.desc_state_.mutex);
149 4616 impl.desc_state_.read_op = nullptr;
150 4616 impl.desc_state_.write_op = nullptr;
151 4616 impl.desc_state_.connect_op = nullptr;
152 4616 }
153 4616 socket_svc->scheduler().register_descriptor(
154 accepted_fd, &impl.desc_state_);
155
156 4616 impl.set_endpoints(
157 4616 static_cast<epoll_acceptor*>(acceptor_impl_)->local_endpoint(),
158 4616 from_sockaddr(peer_storage));
159
160 4616 if (impl_out)
161 4616 *impl_out = &impl;
162 4616 accepted_fd = -1;
163 }
164 else
165 {
166 // No socket service — treat as error
167 *ec_out = make_err(ENOENT);
168 success = false;
169 }
170 }
171
172 4625 if (!success || !acceptor_impl_)
173 {
174 9 if (accepted_fd >= 0)
175 {
176 ::close(accepted_fd);
177 accepted_fd = -1;
178 }
179 9 if (impl_out)
180 9 *impl_out = nullptr;
181 }
182
183 // Move to stack before resuming. See epoll_op::operator()() for rationale.
184 4625 capy::executor_ref saved_ex(ex);
185 4625 std::coroutine_handle<> saved_h(h);
186 4625 auto prevent_premature_destruction = std::move(impl_ptr);
187 4625 dispatch_coro(saved_ex, saved_h).resume();
188 4625 }
189
190 82 inline epoll_acceptor::epoll_acceptor(epoll_acceptor_service& svc) noexcept
191 82 : svc_(svc)
192 {
193 82 }
194
195 inline std::coroutine_handle<>
196 4625 epoll_acceptor::accept(
197 std::coroutine_handle<> h,
198 capy::executor_ref ex,
199 std::stop_token token,
200 std::error_code* ec,
201 io_object::implementation** impl_out)
202 {
203 4625 auto& op = acc_;
204 4625 op.reset();
205 4625 op.h = h;
206 4625 op.ex = ex;
207 4625 op.ec_out = ec;
208 4625 op.impl_out = impl_out;
209 4625 op.fd = fd_;
210 4625 op.start(token, this);
211
212 4625 sockaddr_storage peer_storage{};
213 4625 socklen_t addrlen = sizeof(peer_storage);
214 int accepted;
215 do
216 {
217 4625 accepted = ::accept4(
218 fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen,
219 SOCK_NONBLOCK | SOCK_CLOEXEC);
220 }
221 4625 while (accepted < 0 && errno == EINTR);
222
223 4625 if (accepted >= 0)
224 {
225 {
226 2 std::lock_guard lock(desc_state_.mutex);
227 2 desc_state_.read_ready = false;
228 2 }
229
230 2 if (svc_.scheduler().try_consume_inline_budget())
231 {
232 auto* socket_svc = svc_.socket_service();
233 if (socket_svc)
234 {
235 auto& impl =
236 static_cast<epoll_socket&>(*socket_svc->construct());
237 impl.set_socket(accepted);
238
239 impl.desc_state_.fd = accepted;
240 {
241 std::lock_guard lock(impl.desc_state_.mutex);
242 impl.desc_state_.read_op = nullptr;
243 impl.desc_state_.write_op = nullptr;
244 impl.desc_state_.connect_op = nullptr;
245 }
246 socket_svc->scheduler().register_descriptor(
247 accepted, &impl.desc_state_);
248
249 impl.set_endpoints(
250 local_endpoint_, from_sockaddr(peer_storage));
251
252 *ec = {};
253 if (impl_out)
254 *impl_out = &impl;
255 }
256 else
257 {
258 ::close(accepted);
259 *ec = make_err(ENOENT);
260 if (impl_out)
261 *impl_out = nullptr;
262 }
263 return dispatch_coro(ex, h);
264 }
265
266 2 op.accepted_fd = accepted;
267 2 op.peer_storage = peer_storage;
268 2 op.complete(0, 0);
269 2 op.impl_ptr = shared_from_this();
270 2 svc_.post(&op);
271 2 return std::noop_coroutine();
272 }
273
274 4623 if (errno == EAGAIN || errno == EWOULDBLOCK)
275 {
276 4623 op.impl_ptr = shared_from_this();
277 4623 svc_.work_started();
278
279 4623 std::lock_guard lock(desc_state_.mutex);
280 4623 bool io_done = false;
281 4623 if (desc_state_.read_ready)
282 {
283 desc_state_.read_ready = false;
284 op.perform_io();
285 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
286 if (!io_done)
287 op.errn = 0;
288 }
289
290 4623 if (io_done || op.cancelled.load(std::memory_order_acquire))
291 {
292 svc_.post(&op);
293 svc_.work_finished();
294 }
295 else
296 {
297 4623 desc_state_.read_op = &op;
298 }
299 4623 return std::noop_coroutine();
300 4623 }
301
302 op.complete(errno, 0);
303 op.impl_ptr = shared_from_this();
304 svc_.post(&op);
305 // completion is always posted to scheduler queue, never inline.
306 return std::noop_coroutine();
307 }
308
309 inline void
310 2 epoll_acceptor::cancel() noexcept
311 {
312 2 cancel_single_op(acc_);
313 2 }
314
315 inline void
316 8 epoll_acceptor::cancel_single_op(epoll_op& op) noexcept
317 {
318 8 auto self = weak_from_this().lock();
319 8 if (!self)
320 return;
321
322 8 op.request_cancel();
323
324 8 epoll_op* claimed = nullptr;
325 {
326 8 std::lock_guard lock(desc_state_.mutex);
327 8 if (desc_state_.read_op == &op)
328 7 claimed = std::exchange(desc_state_.read_op, nullptr);
329 8 }
330 8 if (claimed)
331 {
332 7 op.impl_ptr = self;
333 7 svc_.post(&op);
334 7 svc_.work_finished();
335 }
336 8 }
337
338 inline void
339 326 epoll_acceptor::close_socket() noexcept
340 {
341 326 auto self = weak_from_this().lock();
342 326 if (self)
343 {
344 326 acc_.request_cancel();
345
346 326 epoll_op* claimed = nullptr;
347 {
348 326 std::lock_guard lock(desc_state_.mutex);
349 326 claimed = std::exchange(desc_state_.read_op, nullptr);
350 326 desc_state_.read_ready = false;
351 326 desc_state_.write_ready = false;
352 326 }
353
354 326 if (claimed)
355 {
356 2 acc_.impl_ptr = self;
357 2 svc_.post(&acc_);
358 2 svc_.work_finished();
359 }
360
361 326 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
362 desc_state_.impl_ref_ = self;
363 }
364
365 326 if (fd_ >= 0)
366 {
367 81 if (desc_state_.registered_events != 0)
368 77 svc_.scheduler().deregister_descriptor(fd_);
369 81 ::close(fd_);
370 81 fd_ = -1;
371 }
372
373 326 desc_state_.fd = -1;
374 326 desc_state_.registered_events = 0;
375
376 326 local_endpoint_ = endpoint{};
377 326 }
378
379 224 inline epoll_acceptor_service::epoll_acceptor_service(
380 224 capy::execution_context& ctx)
381 224 : ctx_(ctx)
382 224 , state_(
383 std::make_unique<epoll_acceptor_state>(
384 224 ctx.use_service<epoll_scheduler>()))
385 {
386 224 }
387
388 448 inline epoll_acceptor_service::~epoll_acceptor_service() {}
389
390 inline void
391 224 epoll_acceptor_service::shutdown()
392 {
393 224 std::lock_guard lock(state_->mutex_);
394
395 224 while (auto* impl = state_->acceptor_list_.pop_front())
396 impl->close_socket();
397
398 // Don't clear acceptor_ptrs_ here — same rationale as
399 // epoll_socket_service::shutdown(). Let ~state_ release ptrs
400 // after scheduler shutdown has drained all queued ops.
401 224 }
402
403 inline io_object::implementation*
404 82 epoll_acceptor_service::construct()
405 {
406 82 auto impl = std::make_shared<epoll_acceptor>(*this);
407 82 auto* raw = impl.get();
408
409 82 std::lock_guard lock(state_->mutex_);
410 82 state_->acceptor_list_.push_back(raw);
411 82 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
412
413 82 return raw;
414 82 }
415
416 inline void
417 82 epoll_acceptor_service::destroy(io_object::implementation* impl)
418 {
419 82 auto* epoll_impl = static_cast<epoll_acceptor*>(impl);
420 82 epoll_impl->close_socket();
421 82 std::lock_guard lock(state_->mutex_);
422 82 state_->acceptor_list_.remove(epoll_impl);
423 82 state_->acceptor_ptrs_.erase(epoll_impl);
424 82 }
425
426 inline void
427 163 epoll_acceptor_service::close(io_object::handle& h)
428 {
429 163 static_cast<epoll_acceptor*>(h.get())->close_socket();
430 163 }
431
432 inline std::error_code
433 79 epoll_acceptor::set_option(
434 int level, int optname,
435 void const* data, std::size_t size) noexcept
436 {
437 79 if (::setsockopt(fd_, level, optname, data,
438 79 static_cast<socklen_t>(size)) != 0)
439 return make_err(errno);
440 79 return {};
441 }
442
443 inline std::error_code
444 epoll_acceptor::get_option(
445 int level, int optname,
446 void* data, std::size_t* size) const noexcept
447 {
448 socklen_t len = static_cast<socklen_t>(*size);
449 if (::getsockopt(fd_, level, optname, data, &len) != 0)
450 return make_err(errno);
451 *size = static_cast<std::size_t>(len);
452 return {};
453 }
454
455 inline std::error_code
456 81 epoll_acceptor_service::open_acceptor_socket(
457 tcp_acceptor::implementation& impl,
458 int family, int type, int protocol)
459 {
460 81 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
461 81 epoll_impl->close_socket();
462
463 81 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
464 81 if (fd < 0)
465 return make_err(errno);
466
467 81 if (family == AF_INET6)
468 {
469 8 int val = 0; // dual-stack default
470 8 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
471 }
472
473 81 epoll_impl->fd_ = fd;
474
475 // Set up descriptor state but do NOT register with epoll yet
476 81 epoll_impl->desc_state_.fd = fd;
477 {
478 81 std::lock_guard lock(epoll_impl->desc_state_.mutex);
479 81 epoll_impl->desc_state_.read_op = nullptr;
480 81 }
481
482 81 return {};
483 }
484
485 inline std::error_code
486 80 epoll_acceptor_service::bind_acceptor(
487 tcp_acceptor::implementation& impl, endpoint ep)
488 {
489 80 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
490 80 int fd = epoll_impl->fd_;
491
492 80 sockaddr_storage storage{};
493 80 socklen_t addrlen = detail::to_sockaddr(ep, storage);
494 80 if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
495 3 return make_err(errno);
496
497 // Cache local endpoint (resolves ephemeral port)
498 77 sockaddr_storage local{};
499 77 socklen_t local_len = sizeof(local);
500 77 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
501 77 epoll_impl->set_local_endpoint(detail::from_sockaddr(local));
502
503 77 return {};
504 }
505
506 inline std::error_code
507 77 epoll_acceptor_service::listen_acceptor(
508 tcp_acceptor::implementation& impl, int backlog)
509 {
510 77 auto* epoll_impl = static_cast<epoll_acceptor*>(&impl);
511 77 int fd = epoll_impl->fd_;
512
513 77 if (::listen(fd, backlog) < 0)
514 return make_err(errno);
515
516 // Register fd with epoll (edge-triggered mode)
517 77 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
518
519 77 return {};
520 }
521
522 inline void
523 11 epoll_acceptor_service::post(epoll_op* op)
524 {
525 11 state_->sched_.post(op);
526 11 }
527
528 inline void
529 4623 epoll_acceptor_service::work_started() noexcept
530 {
531 4623 state_->sched_.work_started();
532 4623 }
533
534 inline void
535 9 epoll_acceptor_service::work_finished() noexcept
536 {
537 9 state_->sched_.work_finished();
538 9 }
539
540 inline epoll_socket_service*
541 4616 epoll_acceptor_service::socket_service() const noexcept
542 {
543 4616 auto* svc = ctx_.find_service<detail::socket_service>();
544 4616 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
545 }
546
547 } // namespace boost::corosio::detail
548
549 #endif // BOOST_COROSIO_HAS_EPOLL
550
551 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_ACCEPTOR_SERVICE_HPP
552