include/boost/corosio/native/detail/select/select_acceptor_service.hpp

65.6% Lines (185/282) 87.5% Functions (21/24)
include/boost/corosio/native/detail/select/select_acceptor_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/acceptor_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24
25 #include <boost/corosio/detail/endpoint_convert.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/make_err.hpp>
28
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <netinet/in.h>
32 #include <sys/socket.h>
33 #include <unistd.h>
34
35 #include <memory>
36 #include <mutex>
37 #include <unordered_map>
38
39 namespace boost::corosio::detail {
40
41 /** State for select acceptor service. */
42 class select_acceptor_state
43 {
44 public:
45 154 explicit select_acceptor_state(select_scheduler& sched) noexcept
46 154 : sched_(sched)
47 {
48 154 }
49
50 select_scheduler& sched_;
51 std::mutex mutex_;
52 intrusive_list<select_acceptor> acceptor_list_;
53 std::unordered_map<select_acceptor*, std::shared_ptr<select_acceptor>>
54 acceptor_ptrs_;
55 };
56
57 /** select acceptor service implementation.
58
59 Inherits from acceptor_service to enable runtime polymorphism.
60 Uses key_type = acceptor_service for service lookup.
61 */
62 class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
63 {
64 public:
65 explicit select_acceptor_service(capy::execution_context& ctx);
66 ~select_acceptor_service() override;
67
68 select_acceptor_service(select_acceptor_service const&) = delete;
69 select_acceptor_service& operator=(select_acceptor_service const&) = delete;
70
71 void shutdown() override;
72
73 io_object::implementation* construct() override;
74 void destroy(io_object::implementation*) override;
75 void close(io_object::handle&) override;
76 std::error_code open_acceptor_socket(
77 tcp_acceptor::implementation& impl,
78 int family, int type, int protocol) override;
79 std::error_code bind_acceptor(
80 tcp_acceptor::implementation& impl, endpoint ep) override;
81 std::error_code listen_acceptor(
82 tcp_acceptor::implementation& impl, int backlog) override;
83
84 3134 select_scheduler& scheduler() const noexcept
85 {
86 3134 return state_->sched_;
87 }
88 void post(select_op* op);
89 void work_started() noexcept;
90 void work_finished() noexcept;
91
92 /** Get the socket service for creating peer sockets during accept. */
93 select_socket_service* socket_service() const noexcept;
94
95 private:
96 capy::execution_context& ctx_;
97 std::unique_ptr<select_acceptor_state> state_;
98 };
99
100 inline void
101 select_accept_op::cancel() noexcept
102 {
103 if (acceptor_impl_)
104 acceptor_impl_->cancel_single_op(*this);
105 else
106 request_cancel();
107 }
108
109 inline void
110 3074 select_accept_op::operator()()
111 {
112 3074 stop_cb.reset();
113
114 3074 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
115
116 3074 if (ec_out)
117 {
118 3074 if (cancelled.load(std::memory_order_acquire))
119 3 *ec_out = capy::error::canceled;
120 3071 else if (errn != 0)
121 *ec_out = make_err(errn);
122 else
123 3071 *ec_out = {};
124 }
125
126 3074 if (success && accepted_fd >= 0)
127 {
128 3071 if (acceptor_impl_)
129 {
130 3071 auto* socket_svc = static_cast<select_acceptor*>(acceptor_impl_)
131 3071 ->service()
132 3071 .socket_service();
133 3071 if (socket_svc)
134 {
135 auto& impl =
136 3071 static_cast<select_socket&>(*socket_svc->construct());
137 3071 impl.set_socket(accepted_fd);
138
139 3071 sockaddr_storage local_storage{};
140 3071 socklen_t local_len = sizeof(local_storage);
141 3071 sockaddr_storage remote_storage{};
142 3071 socklen_t remote_len = sizeof(remote_storage);
143
144 3071 endpoint local_ep, remote_ep;
145 3071 if (::getsockname(
146 accepted_fd,
147 reinterpret_cast<sockaddr*>(&local_storage),
148 3071 &local_len) == 0)
149 3071 local_ep = from_sockaddr(local_storage);
150 3071 if (::getpeername(
151 accepted_fd,
152 reinterpret_cast<sockaddr*>(&remote_storage),
153 3071 &remote_len) == 0)
154 3071 remote_ep = from_sockaddr(remote_storage);
155
156 3071 impl.set_endpoints(local_ep, remote_ep);
157
158 3071 if (impl_out)
159 3071 *impl_out = &impl;
160
161 3071 accepted_fd = -1;
162 }
163 else
164 {
165 if (ec_out && !*ec_out)
166 *ec_out = make_err(ENOENT);
167 ::close(accepted_fd);
168 accepted_fd = -1;
169 if (impl_out)
170 *impl_out = nullptr;
171 }
172 }
173 else
174 {
175 ::close(accepted_fd);
176 accepted_fd = -1;
177 if (impl_out)
178 *impl_out = nullptr;
179 }
180 3071 }
181 else
182 {
183 3 if (accepted_fd >= 0)
184 {
185 ::close(accepted_fd);
186 accepted_fd = -1;
187 }
188
189 3 if (peer_impl)
190 {
191 auto* socket_svc_cleanup =
192 static_cast<select_acceptor*>(acceptor_impl_)
193 ->service()
194 .socket_service();
195 if (socket_svc_cleanup)
196 socket_svc_cleanup->destroy(peer_impl);
197 peer_impl = nullptr;
198 }
199
200 3 if (impl_out)
201 3 *impl_out = nullptr;
202 }
203
204 // Move to stack before destroying the frame
205 3074 capy::executor_ref saved_ex(ex);
206 3074 std::coroutine_handle<> saved_h(h);
207 3074 impl_ptr.reset();
208 3074 dispatch_coro(saved_ex, saved_h).resume();
209 3074 }
210
211 61 inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
212 61 : svc_(svc)
213 {
214 61 }
215
216 inline std::coroutine_handle<>
217 3074 select_acceptor::accept(
218 std::coroutine_handle<> h,
219 capy::executor_ref ex,
220 std::stop_token token,
221 std::error_code* ec,
222 io_object::implementation** impl_out)
223 {
224 3074 auto& op = acc_;
225 3074 op.reset();
226 3074 op.h = h;
227 3074 op.ex = ex;
228 3074 op.ec_out = ec;
229 3074 op.impl_out = impl_out;
230 3074 op.fd = fd_;
231 3074 op.start(token, this);
232
233 3074 sockaddr_storage peer_storage{};
234 3074 socklen_t addrlen = sizeof(peer_storage);
235 int accepted =
236 3074 ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
237
238 3074 if (accepted >= 0)
239 {
240 // Reject fds that exceed select()'s FD_SETSIZE limit.
241 2 if (accepted >= FD_SETSIZE)
242 {
243 ::close(accepted);
244 op.accepted_fd = -1;
245 op.complete(EINVAL, 0);
246 op.impl_ptr = shared_from_this();
247 svc_.post(&op);
248 return std::noop_coroutine();
249 }
250
251 // Set non-blocking and close-on-exec flags.
252 2 int flags = ::fcntl(accepted, F_GETFL, 0);
253 2 if (flags == -1)
254 {
255 int err = errno;
256 ::close(accepted);
257 op.accepted_fd = -1;
258 op.complete(err, 0);
259 op.impl_ptr = shared_from_this();
260 svc_.post(&op);
261 return std::noop_coroutine();
262 }
263
264 2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
265 {
266 int err = errno;
267 ::close(accepted);
268 op.accepted_fd = -1;
269 op.complete(err, 0);
270 op.impl_ptr = shared_from_this();
271 svc_.post(&op);
272 return std::noop_coroutine();
273 }
274
275 2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
276 {
277 int err = errno;
278 ::close(accepted);
279 op.accepted_fd = -1;
280 op.complete(err, 0);
281 op.impl_ptr = shared_from_this();
282 svc_.post(&op);
283 return std::noop_coroutine();
284 }
285
286 2 op.accepted_fd = accepted;
287 2 op.complete(0, 0);
288 2 op.impl_ptr = shared_from_this();
289 2 svc_.post(&op);
290 2 return std::noop_coroutine();
291 }
292
293 3072 if (errno == EAGAIN || errno == EWOULDBLOCK)
294 {
295 3072 svc_.work_started();
296 3072 op.impl_ptr = shared_from_this();
297
298 // Set registering BEFORE register_fd to close the race window where
299 // reactor sees an event before we set registered.
300 3072 op.registered.store(
301 select_registration_state::registering, std::memory_order_release);
302 3072 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
303
304 // Transition to registered. If this fails, reactor or cancel already
305 // claimed the op (state is now unregistered), so we're done. However,
306 // we must still deregister the fd because cancel's deregister_fd may
307 // have run before our register_fd, leaving the fd orphaned.
308 3072 auto expected = select_registration_state::registering;
309 3072 if (!op.registered.compare_exchange_strong(
310 expected, select_registration_state::registered,
311 std::memory_order_acq_rel))
312 {
313 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
314 return std::noop_coroutine();
315 }
316
317 // If cancelled was set before we registered, handle it now.
318 3072 if (op.cancelled.load(std::memory_order_acquire))
319 {
320 auto prev = op.registered.exchange(
321 select_registration_state::unregistered,
322 std::memory_order_acq_rel);
323 if (prev != select_registration_state::unregistered)
324 {
325 svc_.scheduler().deregister_fd(
326 fd_, select_scheduler::event_read);
327 op.impl_ptr = shared_from_this();
328 svc_.post(&op);
329 svc_.work_finished();
330 }
331 }
332 3072 return std::noop_coroutine();
333 }
334
335 op.complete(errno, 0);
336 op.impl_ptr = shared_from_this();
337 svc_.post(&op);
338 return std::noop_coroutine();
339 }
340
341 inline void
342 2 select_acceptor::cancel() noexcept
343 {
344 2 auto self = weak_from_this().lock();
345 2 if (!self)
346 return;
347
348 2 auto prev = acc_.registered.exchange(
349 select_registration_state::unregistered, std::memory_order_acq_rel);
350 2 acc_.request_cancel();
351
352 2 if (prev != select_registration_state::unregistered)
353 {
354 1 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
355 1 acc_.impl_ptr = self;
356 1 svc_.post(&acc_);
357 1 svc_.work_finished();
358 }
359 2 }
360
361 inline void
362 select_acceptor::cancel_single_op(select_op& op) noexcept
363 {
364 auto self = weak_from_this().lock();
365 if (!self)
366 return;
367
368 auto prev = op.registered.exchange(
369 select_registration_state::unregistered, std::memory_order_acq_rel);
370 op.request_cancel();
371
372 if (prev != select_registration_state::unregistered)
373 {
374 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
375
376 op.impl_ptr = self;
377 svc_.post(&op);
378 svc_.work_finished();
379 }
380 }
381
382 inline void
383 240 select_acceptor::close_socket() noexcept
384 {
385 240 auto self = weak_from_this().lock();
386 240 if (self)
387 {
388 240 auto prev = acc_.registered.exchange(
389 select_registration_state::unregistered, std::memory_order_acq_rel);
390 240 acc_.request_cancel();
391
392 240 if (prev != select_registration_state::unregistered)
393 {
394 2 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
395 2 acc_.impl_ptr = self;
396 2 svc_.post(&acc_);
397 2 svc_.work_finished();
398 }
399 }
400
401 240 if (fd_ >= 0)
402 {
403 59 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
404 59 ::close(fd_);
405 59 fd_ = -1;
406 }
407
408 240 local_endpoint_ = endpoint{};
409 240 }
410
411 154 inline select_acceptor_service::select_acceptor_service(
412 154 capy::execution_context& ctx)
413 154 : ctx_(ctx)
414 154 , state_(
415 std::make_unique<select_acceptor_state>(
416 154 ctx.use_service<select_scheduler>()))
417 {
418 154 }
419
420 308 inline select_acceptor_service::~select_acceptor_service() {}
421
422 inline void
423 154 select_acceptor_service::shutdown()
424 {
425 154 std::lock_guard lock(state_->mutex_);
426
427 154 while (auto* impl = state_->acceptor_list_.pop_front())
428 impl->close_socket();
429
430 // Don't clear acceptor_ptrs_ here — same rationale as
431 // select_socket_service::shutdown(). Let ~state_ release ptrs
432 // after scheduler shutdown has drained all queued ops.
433 154 }
434
435 inline io_object::implementation*
436 61 select_acceptor_service::construct()
437 {
438 61 auto impl = std::make_shared<select_acceptor>(*this);
439 61 auto* raw = impl.get();
440
441 61 std::lock_guard lock(state_->mutex_);
442 61 state_->acceptor_list_.push_back(raw);
443 61 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
444
445 61 return raw;
446 61 }
447
448 inline void
449 61 select_acceptor_service::destroy(io_object::implementation* impl)
450 {
451 61 auto* select_impl = static_cast<select_acceptor*>(impl);
452 61 select_impl->close_socket();
453 61 std::lock_guard lock(state_->mutex_);
454 61 state_->acceptor_list_.remove(select_impl);
455 61 state_->acceptor_ptrs_.erase(select_impl);
456 61 }
457
458 inline void
459 120 select_acceptor_service::close(io_object::handle& h)
460 {
461 120 static_cast<select_acceptor*>(h.get())->close_socket();
462 120 }
463
464 inline std::error_code
465 58 select_acceptor::set_option(
466 int level, int optname,
467 void const* data, std::size_t size) noexcept
468 {
469 58 if (::setsockopt(fd_, level, optname, data,
470 58 static_cast<socklen_t>(size)) != 0)
471 return make_err(errno);
472 58 return {};
473 }
474
475 inline std::error_code
476 select_acceptor::get_option(
477 int level, int optname,
478 void* data, std::size_t* size) const noexcept
479 {
480 socklen_t len = static_cast<socklen_t>(*size);
481 if (::getsockopt(fd_, level, optname, data, &len) != 0)
482 return make_err(errno);
483 *size = static_cast<std::size_t>(len);
484 return {};
485 }
486
487 inline std::error_code
488 59 select_acceptor_service::open_acceptor_socket(
489 tcp_acceptor::implementation& impl,
490 int family, int type, int protocol)
491 {
492 59 auto* select_impl = static_cast<select_acceptor*>(&impl);
493 59 select_impl->close_socket();
494
495 59 int fd = ::socket(family, type, protocol);
496 59 if (fd < 0)
497 return make_err(errno);
498
499 // Set non-blocking and close-on-exec
500 59 int flags = ::fcntl(fd, F_GETFL, 0);
501 59 if (flags == -1)
502 {
503 int errn = errno;
504 ::close(fd);
505 return make_err(errn);
506 }
507 59 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
508 {
509 int errn = errno;
510 ::close(fd);
511 return make_err(errn);
512 }
513 59 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
514 {
515 int errn = errno;
516 ::close(fd);
517 return make_err(errn);
518 }
519
520 59 if (fd >= FD_SETSIZE)
521 {
522 ::close(fd);
523 return make_err(EMFILE);
524 }
525
526 59 if (family == AF_INET6)
527 {
528 8 int val = 0; // dual-stack default
529 8 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
530 }
531
532 59 select_impl->fd_ = fd;
533 59 return {};
534 }
535
536 inline std::error_code
537 58 select_acceptor_service::bind_acceptor(
538 tcp_acceptor::implementation& impl, endpoint ep)
539 {
540 58 auto* select_impl = static_cast<select_acceptor*>(&impl);
541 58 int fd = select_impl->fd_;
542
543 58 sockaddr_storage storage{};
544 58 socklen_t addrlen = detail::to_sockaddr(ep, storage);
545 58 if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
546 1 return make_err(errno);
547
548 // Cache local endpoint (resolves ephemeral port)
549 57 sockaddr_storage local{};
550 57 socklen_t local_len = sizeof(local);
551 57 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
552 57 select_impl->set_local_endpoint(detail::from_sockaddr(local));
553
554 57 return {};
555 }
556
557 inline std::error_code
558 57 select_acceptor_service::listen_acceptor(
559 tcp_acceptor::implementation& impl, int backlog)
560 {
561 57 auto* select_impl = static_cast<select_acceptor*>(&impl);
562 57 int fd = select_impl->fd_;
563
564 57 if (::listen(fd, backlog) < 0)
565 return make_err(errno);
566
567 57 return {};
568 }
569
570 inline void
571 5 select_acceptor_service::post(select_op* op)
572 {
573 5 state_->sched_.post(op);
574 5 }
575
576 inline void
577 3072 select_acceptor_service::work_started() noexcept
578 {
579 3072 state_->sched_.work_started();
580 3072 }
581
582 inline void
583 3 select_acceptor_service::work_finished() noexcept
584 {
585 3 state_->sched_.work_finished();
586 3 }
587
588 inline select_socket_service*
589 3071 select_acceptor_service::socket_service() const noexcept
590 {
591 3071 auto* svc = ctx_.find_service<detail::socket_service>();
592 3071 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
593 }
594
595 } // namespace boost::corosio::detail
596
597 #endif // BOOST_COROSIO_HAS_SELECT
598
599 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
600