TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/acceptor_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_acceptor.hpp>
22 : #include <boost/corosio/native/detail/select/select_socket_service.hpp>
23 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
24 :
25 : #include <boost/corosio/detail/endpoint_convert.hpp>
26 : #include <boost/corosio/detail/dispatch_coro.hpp>
27 : #include <boost/corosio/detail/make_err.hpp>
28 :
29 : #include <errno.h>
30 : #include <fcntl.h>
31 : #include <netinet/in.h>
32 : #include <sys/socket.h>
33 : #include <unistd.h>
34 :
35 : #include <memory>
36 : #include <mutex>
37 : #include <unordered_map>
38 :
39 : namespace boost::corosio::detail {
40 :
41 : /** State for select acceptor service. */
42 : class select_acceptor_state
43 : {
44 : public:
45 HIT 154 : explicit select_acceptor_state(select_scheduler& sched) noexcept
46 154 : : sched_(sched)
47 : {
48 154 : }
49 :
50 : select_scheduler& sched_;
51 : std::mutex mutex_;
52 : intrusive_list<select_acceptor> acceptor_list_;
53 : std::unordered_map<select_acceptor*, std::shared_ptr<select_acceptor>>
54 : acceptor_ptrs_;
55 : };
56 :
57 : /** select acceptor service implementation.
58 :
59 : Inherits from acceptor_service to enable runtime polymorphism.
60 : Uses key_type = acceptor_service for service lookup.
61 : */
62 : class BOOST_COROSIO_DECL select_acceptor_service final : public acceptor_service
63 : {
64 : public:
65 : explicit select_acceptor_service(capy::execution_context& ctx);
66 : ~select_acceptor_service() override;
67 :
68 : select_acceptor_service(select_acceptor_service const&) = delete;
69 : select_acceptor_service& operator=(select_acceptor_service const&) = delete;
70 :
71 : void shutdown() override;
72 :
73 : io_object::implementation* construct() override;
74 : void destroy(io_object::implementation*) override;
75 : void close(io_object::handle&) override;
76 : std::error_code open_acceptor_socket(
77 : tcp_acceptor::implementation& impl,
78 : int family, int type, int protocol) override;
79 : std::error_code bind_acceptor(
80 : tcp_acceptor::implementation& impl, endpoint ep) override;
81 : std::error_code listen_acceptor(
82 : tcp_acceptor::implementation& impl, int backlog) override;
83 :
84 3134 : select_scheduler& scheduler() const noexcept
85 : {
86 3134 : return state_->sched_;
87 : }
88 : void post(select_op* op);
89 : void work_started() noexcept;
90 : void work_finished() noexcept;
91 :
92 : /** Get the socket service for creating peer sockets during accept. */
93 : select_socket_service* socket_service() const noexcept;
94 :
95 : private:
96 : capy::execution_context& ctx_;
97 : std::unique_ptr<select_acceptor_state> state_;
98 : };
99 :
100 : inline void
101 MIS 0 : select_accept_op::cancel() noexcept
102 : {
103 0 : if (acceptor_impl_)
104 0 : acceptor_impl_->cancel_single_op(*this);
105 : else
106 0 : request_cancel();
107 0 : }
108 :
109 : inline void
110 HIT 3074 : select_accept_op::operator()()
111 : {
112 3074 : stop_cb.reset();
113 :
114 3074 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
115 :
116 3074 : if (ec_out)
117 : {
118 3074 : if (cancelled.load(std::memory_order_acquire))
119 3 : *ec_out = capy::error::canceled;
120 3071 : else if (errn != 0)
121 MIS 0 : *ec_out = make_err(errn);
122 : else
123 HIT 3071 : *ec_out = {};
124 : }
125 :
126 3074 : if (success && accepted_fd >= 0)
127 : {
128 3071 : if (acceptor_impl_)
129 : {
130 3071 : auto* socket_svc = static_cast<select_acceptor*>(acceptor_impl_)
131 3071 : ->service()
132 3071 : .socket_service();
133 3071 : if (socket_svc)
134 : {
135 : auto& impl =
136 3071 : static_cast<select_socket&>(*socket_svc->construct());
137 3071 : impl.set_socket(accepted_fd);
138 :
139 3071 : sockaddr_storage local_storage{};
140 3071 : socklen_t local_len = sizeof(local_storage);
141 3071 : sockaddr_storage remote_storage{};
142 3071 : socklen_t remote_len = sizeof(remote_storage);
143 :
144 3071 : endpoint local_ep, remote_ep;
145 3071 : if (::getsockname(
146 : accepted_fd,
147 : reinterpret_cast<sockaddr*>(&local_storage),
148 3071 : &local_len) == 0)
149 3071 : local_ep = from_sockaddr(local_storage);
150 3071 : if (::getpeername(
151 : accepted_fd,
152 : reinterpret_cast<sockaddr*>(&remote_storage),
153 3071 : &remote_len) == 0)
154 3071 : remote_ep = from_sockaddr(remote_storage);
155 :
156 3071 : impl.set_endpoints(local_ep, remote_ep);
157 :
158 3071 : if (impl_out)
159 3071 : *impl_out = &impl;
160 :
161 3071 : accepted_fd = -1;
162 : }
163 : else
164 : {
165 MIS 0 : if (ec_out && !*ec_out)
166 0 : *ec_out = make_err(ENOENT);
167 0 : ::close(accepted_fd);
168 0 : accepted_fd = -1;
169 0 : if (impl_out)
170 0 : *impl_out = nullptr;
171 : }
172 : }
173 : else
174 : {
175 0 : ::close(accepted_fd);
176 0 : accepted_fd = -1;
177 0 : if (impl_out)
178 0 : *impl_out = nullptr;
179 : }
180 HIT 3071 : }
181 : else
182 : {
183 3 : if (accepted_fd >= 0)
184 : {
185 MIS 0 : ::close(accepted_fd);
186 0 : accepted_fd = -1;
187 : }
188 :
189 HIT 3 : if (peer_impl)
190 : {
191 : auto* socket_svc_cleanup =
192 MIS 0 : static_cast<select_acceptor*>(acceptor_impl_)
193 0 : ->service()
194 0 : .socket_service();
195 0 : if (socket_svc_cleanup)
196 0 : socket_svc_cleanup->destroy(peer_impl);
197 0 : peer_impl = nullptr;
198 : }
199 :
200 HIT 3 : if (impl_out)
201 3 : *impl_out = nullptr;
202 : }
203 :
204 : // Move to stack before destroying the frame
205 3074 : capy::executor_ref saved_ex(ex);
206 3074 : std::coroutine_handle<> saved_h(h);
207 3074 : impl_ptr.reset();
208 3074 : dispatch_coro(saved_ex, saved_h).resume();
209 3074 : }
210 :
211 61 : inline select_acceptor::select_acceptor(select_acceptor_service& svc) noexcept
212 61 : : svc_(svc)
213 : {
214 61 : }
215 :
216 : inline std::coroutine_handle<>
217 3074 : select_acceptor::accept(
218 : std::coroutine_handle<> h,
219 : capy::executor_ref ex,
220 : std::stop_token token,
221 : std::error_code* ec,
222 : io_object::implementation** impl_out)
223 : {
224 3074 : auto& op = acc_;
225 3074 : op.reset();
226 3074 : op.h = h;
227 3074 : op.ex = ex;
228 3074 : op.ec_out = ec;
229 3074 : op.impl_out = impl_out;
230 3074 : op.fd = fd_;
231 3074 : op.start(token, this);
232 :
233 3074 : sockaddr_storage peer_storage{};
234 3074 : socklen_t addrlen = sizeof(peer_storage);
235 : int accepted =
236 3074 : ::accept(fd_, reinterpret_cast<sockaddr*>(&peer_storage), &addrlen);
237 :
238 3074 : if (accepted >= 0)
239 : {
240 : // Reject fds that exceed select()'s FD_SETSIZE limit.
241 2 : if (accepted >= FD_SETSIZE)
242 : {
243 MIS 0 : ::close(accepted);
244 0 : op.accepted_fd = -1;
245 0 : op.complete(EINVAL, 0);
246 0 : op.impl_ptr = shared_from_this();
247 0 : svc_.post(&op);
248 0 : return std::noop_coroutine();
249 : }
250 :
251 : // Set non-blocking and close-on-exec flags.
252 HIT 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
253 2 : if (flags == -1)
254 : {
255 MIS 0 : int err = errno;
256 0 : ::close(accepted);
257 0 : op.accepted_fd = -1;
258 0 : op.complete(err, 0);
259 0 : op.impl_ptr = shared_from_this();
260 0 : svc_.post(&op);
261 0 : return std::noop_coroutine();
262 : }
263 :
264 HIT 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
265 : {
266 MIS 0 : int err = errno;
267 0 : ::close(accepted);
268 0 : op.accepted_fd = -1;
269 0 : op.complete(err, 0);
270 0 : op.impl_ptr = shared_from_this();
271 0 : svc_.post(&op);
272 0 : return std::noop_coroutine();
273 : }
274 :
275 HIT 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
276 : {
277 MIS 0 : int err = errno;
278 0 : ::close(accepted);
279 0 : op.accepted_fd = -1;
280 0 : op.complete(err, 0);
281 0 : op.impl_ptr = shared_from_this();
282 0 : svc_.post(&op);
283 0 : return std::noop_coroutine();
284 : }
285 :
286 HIT 2 : op.accepted_fd = accepted;
287 2 : op.complete(0, 0);
288 2 : op.impl_ptr = shared_from_this();
289 2 : svc_.post(&op);
290 2 : return std::noop_coroutine();
291 : }
292 :
293 3072 : if (errno == EAGAIN || errno == EWOULDBLOCK)
294 : {
295 3072 : svc_.work_started();
296 3072 : op.impl_ptr = shared_from_this();
297 :
298 : // Set registering BEFORE register_fd to close the race window where
299 : // reactor sees an event before we set registered.
300 3072 : op.registered.store(
301 : select_registration_state::registering, std::memory_order_release);
302 3072 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
303 :
304 : // Transition to registered. If this fails, reactor or cancel already
305 : // claimed the op (state is now unregistered), so we're done. However,
306 : // we must still deregister the fd because cancel's deregister_fd may
307 : // have run before our register_fd, leaving the fd orphaned.
308 3072 : auto expected = select_registration_state::registering;
309 3072 : if (!op.registered.compare_exchange_strong(
310 : expected, select_registration_state::registered,
311 : std::memory_order_acq_rel))
312 : {
313 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
314 0 : return std::noop_coroutine();
315 : }
316 :
317 : // If cancelled was set before we registered, handle it now.
318 HIT 3072 : if (op.cancelled.load(std::memory_order_acquire))
319 : {
320 MIS 0 : auto prev = op.registered.exchange(
321 : select_registration_state::unregistered,
322 : std::memory_order_acq_rel);
323 0 : if (prev != select_registration_state::unregistered)
324 : {
325 0 : svc_.scheduler().deregister_fd(
326 : fd_, select_scheduler::event_read);
327 0 : op.impl_ptr = shared_from_this();
328 0 : svc_.post(&op);
329 0 : svc_.work_finished();
330 : }
331 : }
332 HIT 3072 : return std::noop_coroutine();
333 : }
334 :
335 MIS 0 : op.complete(errno, 0);
336 0 : op.impl_ptr = shared_from_this();
337 0 : svc_.post(&op);
338 0 : return std::noop_coroutine();
339 : }
340 :
341 : inline void
342 HIT 2 : select_acceptor::cancel() noexcept
343 : {
344 2 : auto self = weak_from_this().lock();
345 2 : if (!self)
346 MIS 0 : return;
347 :
348 HIT 2 : auto prev = acc_.registered.exchange(
349 : select_registration_state::unregistered, std::memory_order_acq_rel);
350 2 : acc_.request_cancel();
351 :
352 2 : if (prev != select_registration_state::unregistered)
353 : {
354 1 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
355 1 : acc_.impl_ptr = self;
356 1 : svc_.post(&acc_);
357 1 : svc_.work_finished();
358 : }
359 2 : }
360 :
361 : inline void
362 MIS 0 : select_acceptor::cancel_single_op(select_op& op) noexcept
363 : {
364 0 : auto self = weak_from_this().lock();
365 0 : if (!self)
366 0 : return;
367 :
368 0 : auto prev = op.registered.exchange(
369 : select_registration_state::unregistered, std::memory_order_acq_rel);
370 0 : op.request_cancel();
371 :
372 0 : if (prev != select_registration_state::unregistered)
373 : {
374 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
375 :
376 0 : op.impl_ptr = self;
377 0 : svc_.post(&op);
378 0 : svc_.work_finished();
379 : }
380 0 : }
381 :
382 : inline void
383 HIT 240 : select_acceptor::close_socket() noexcept
384 : {
385 240 : auto self = weak_from_this().lock();
386 240 : if (self)
387 : {
388 240 : auto prev = acc_.registered.exchange(
389 : select_registration_state::unregistered, std::memory_order_acq_rel);
390 240 : acc_.request_cancel();
391 :
392 240 : if (prev != select_registration_state::unregistered)
393 : {
394 2 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
395 2 : acc_.impl_ptr = self;
396 2 : svc_.post(&acc_);
397 2 : svc_.work_finished();
398 : }
399 : }
400 :
401 240 : if (fd_ >= 0)
402 : {
403 59 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
404 59 : ::close(fd_);
405 59 : fd_ = -1;
406 : }
407 :
408 240 : local_endpoint_ = endpoint{};
409 240 : }
410 :
411 154 : inline select_acceptor_service::select_acceptor_service(
412 154 : capy::execution_context& ctx)
413 154 : : ctx_(ctx)
414 154 : , state_(
415 : std::make_unique<select_acceptor_state>(
416 154 : ctx.use_service<select_scheduler>()))
417 : {
418 154 : }
419 :
420 308 : inline select_acceptor_service::~select_acceptor_service() {}
421 :
422 : inline void
423 154 : select_acceptor_service::shutdown()
424 : {
425 154 : std::lock_guard lock(state_->mutex_);
426 :
427 154 : while (auto* impl = state_->acceptor_list_.pop_front())
428 MIS 0 : impl->close_socket();
429 :
430 : // Don't clear acceptor_ptrs_ here — same rationale as
431 : // select_socket_service::shutdown(). Let ~state_ release ptrs
432 : // after scheduler shutdown has drained all queued ops.
433 HIT 154 : }
434 :
435 : inline io_object::implementation*
436 61 : select_acceptor_service::construct()
437 : {
438 61 : auto impl = std::make_shared<select_acceptor>(*this);
439 61 : auto* raw = impl.get();
440 :
441 61 : std::lock_guard lock(state_->mutex_);
442 61 : state_->acceptor_list_.push_back(raw);
443 61 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
444 :
445 61 : return raw;
446 61 : }
447 :
448 : inline void
449 61 : select_acceptor_service::destroy(io_object::implementation* impl)
450 : {
451 61 : auto* select_impl = static_cast<select_acceptor*>(impl);
452 61 : select_impl->close_socket();
453 61 : std::lock_guard lock(state_->mutex_);
454 61 : state_->acceptor_list_.remove(select_impl);
455 61 : state_->acceptor_ptrs_.erase(select_impl);
456 61 : }
457 :
458 : inline void
459 120 : select_acceptor_service::close(io_object::handle& h)
460 : {
461 120 : static_cast<select_acceptor*>(h.get())->close_socket();
462 120 : }
463 :
464 : inline std::error_code
465 58 : select_acceptor::set_option(
466 : int level, int optname,
467 : void const* data, std::size_t size) noexcept
468 : {
469 58 : if (::setsockopt(fd_, level, optname, data,
470 58 : static_cast<socklen_t>(size)) != 0)
471 MIS 0 : return make_err(errno);
472 HIT 58 : return {};
473 : }
474 :
475 : inline std::error_code
476 MIS 0 : select_acceptor::get_option(
477 : int level, int optname,
478 : void* data, std::size_t* size) const noexcept
479 : {
480 0 : socklen_t len = static_cast<socklen_t>(*size);
481 0 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
482 0 : return make_err(errno);
483 0 : *size = static_cast<std::size_t>(len);
484 0 : return {};
485 : }
486 :
487 : inline std::error_code
488 HIT 59 : select_acceptor_service::open_acceptor_socket(
489 : tcp_acceptor::implementation& impl,
490 : int family, int type, int protocol)
491 : {
492 59 : auto* select_impl = static_cast<select_acceptor*>(&impl);
493 59 : select_impl->close_socket();
494 :
495 59 : int fd = ::socket(family, type, protocol);
496 59 : if (fd < 0)
497 MIS 0 : return make_err(errno);
498 :
499 : // Set non-blocking and close-on-exec
500 HIT 59 : int flags = ::fcntl(fd, F_GETFL, 0);
501 59 : if (flags == -1)
502 : {
503 MIS 0 : int errn = errno;
504 0 : ::close(fd);
505 0 : return make_err(errn);
506 : }
507 HIT 59 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
508 : {
509 MIS 0 : int errn = errno;
510 0 : ::close(fd);
511 0 : return make_err(errn);
512 : }
513 HIT 59 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
514 : {
515 MIS 0 : int errn = errno;
516 0 : ::close(fd);
517 0 : return make_err(errn);
518 : }
519 :
520 HIT 59 : if (fd >= FD_SETSIZE)
521 : {
522 MIS 0 : ::close(fd);
523 0 : return make_err(EMFILE);
524 : }
525 :
526 HIT 59 : if (family == AF_INET6)
527 : {
528 8 : int val = 0; // dual-stack default
529 8 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
530 : }
531 :
532 59 : select_impl->fd_ = fd;
533 59 : return {};
534 : }
535 :
536 : inline std::error_code
537 58 : select_acceptor_service::bind_acceptor(
538 : tcp_acceptor::implementation& impl, endpoint ep)
539 : {
540 58 : auto* select_impl = static_cast<select_acceptor*>(&impl);
541 58 : int fd = select_impl->fd_;
542 :
543 58 : sockaddr_storage storage{};
544 58 : socklen_t addrlen = detail::to_sockaddr(ep, storage);
545 58 : if (::bind(fd, reinterpret_cast<sockaddr*>(&storage), addrlen) < 0)
546 1 : return make_err(errno);
547 :
548 : // Cache local endpoint (resolves ephemeral port)
549 57 : sockaddr_storage local{};
550 57 : socklen_t local_len = sizeof(local);
551 57 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local), &local_len) == 0)
552 57 : select_impl->set_local_endpoint(detail::from_sockaddr(local));
553 :
554 57 : return {};
555 : }
556 :
557 : inline std::error_code
558 57 : select_acceptor_service::listen_acceptor(
559 : tcp_acceptor::implementation& impl, int backlog)
560 : {
561 57 : auto* select_impl = static_cast<select_acceptor*>(&impl);
562 57 : int fd = select_impl->fd_;
563 :
564 57 : if (::listen(fd, backlog) < 0)
565 MIS 0 : return make_err(errno);
566 :
567 HIT 57 : return {};
568 : }
569 :
570 : inline void
571 5 : select_acceptor_service::post(select_op* op)
572 : {
573 5 : state_->sched_.post(op);
574 5 : }
575 :
576 : inline void
577 3072 : select_acceptor_service::work_started() noexcept
578 : {
579 3072 : state_->sched_.work_started();
580 3072 : }
581 :
582 : inline void
583 3 : select_acceptor_service::work_finished() noexcept
584 : {
585 3 : state_->sched_.work_finished();
586 3 : }
587 :
588 : inline select_socket_service*
589 3071 : select_acceptor_service::socket_service() const noexcept
590 : {
591 3071 : auto* svc = ctx_.find_service<detail::socket_service>();
592 3071 : return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
593 : }
594 :
595 : } // namespace boost::corosio::detail
596 :
597 : #endif // BOOST_COROSIO_HAS_SELECT
598 :
599 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_ACCEPTOR_SERVICE_HPP
|