TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 : #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/capy/ex/execution_context.hpp>
19 : #include <boost/corosio/detail/socket_service.hpp>
20 :
21 : #include <boost/corosio/native/detail/select/select_socket.hpp>
22 : #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23 :
24 : #include <boost/corosio/detail/endpoint_convert.hpp>
25 : #include <boost/corosio/detail/dispatch_coro.hpp>
26 : #include <boost/corosio/detail/make_err.hpp>
27 :
28 : #include <boost/corosio/detail/except.hpp>
29 :
30 : #include <boost/capy/buffers.hpp>
31 :
32 : #include <errno.h>
33 : #include <fcntl.h>
34 : #include <netinet/in.h>
35 : #include <netinet/tcp.h>
36 : #include <sys/socket.h>
37 : #include <unistd.h>
38 :
39 : #include <memory>
40 : #include <mutex>
41 : #include <unordered_map>
42 :
43 : /*
44 : select Socket Implementation
45 : ============================
46 :
47 : This mirrors the epoll_sockets design for behavioral consistency.
48 : Each I/O operation follows the same pattern:
49 : 1. Try the syscall immediately (non-blocking socket)
50 : 2. If it succeeds or fails with a real error, post to completion queue
51 : 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52 :
53 : Cancellation
54 : ------------
55 : See op.hpp for the completion/cancellation race handling via the
56 : `registered` atomic. cancel() must complete pending operations (post
57 : them with cancelled flag) so coroutines waiting on them can resume.
58 : close_socket() calls cancel() first to ensure this.
59 :
60 : Impl Lifetime with shared_ptr
61 : -----------------------------
62 : Socket impls use enable_shared_from_this. The service owns impls via
63 : shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 : removal. When a user calls close(), we call cancel() which posts pending
65 : ops to the scheduler.
66 :
67 : CRITICAL: The posted ops must keep the impl alive until they complete.
68 : Otherwise the scheduler would process a freed op (use-after-free). The
69 : cancel() method captures shared_from_this() into op.impl_ptr before
70 : posting. When the op completes, impl_ptr is cleared, allowing the impl
71 : to be destroyed if no other references exist.
72 :
73 : Service Ownership
74 : -----------------
75 : select_socket_service owns all socket impls. destroy() removes the
76 : shared_ptr from the map, but the impl may survive if ops still hold
77 : impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 : in-flight ops will complete and release their refs.
79 : */
80 :
81 : namespace boost::corosio::detail {
82 :
83 : /** State for select socket service. */
84 : class select_socket_state
85 : {
86 : public:
87 HIT 154 : explicit select_socket_state(select_scheduler& sched) noexcept
88 154 : : sched_(sched)
89 : {
90 154 : }
91 :
92 : select_scheduler& sched_;
93 : std::mutex mutex_;
94 : intrusive_list<select_socket> socket_list_;
95 : std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 : socket_ptrs_;
97 : };
98 :
99 : /** select socket service implementation.
100 :
101 : Inherits from socket_service to enable runtime polymorphism.
102 : Uses key_type = socket_service for service lookup.
103 : */
104 : class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 : {
106 : public:
107 : explicit select_socket_service(capy::execution_context& ctx);
108 : ~select_socket_service() override;
109 :
110 : select_socket_service(select_socket_service const&) = delete;
111 : select_socket_service& operator=(select_socket_service const&) = delete;
112 :
113 : void shutdown() override;
114 :
115 : io_object::implementation* construct() override;
116 : void destroy(io_object::implementation*) override;
117 : void close(io_object::handle&) override;
118 : std::error_code
119 : open_socket(tcp_socket::implementation& impl,
120 : int family, int type, int protocol) override;
121 :
122 9677 : select_scheduler& scheduler() const noexcept
123 : {
124 9677 : return state_->sched_;
125 : }
126 : void post(select_op* op);
127 : void work_started() noexcept;
128 : void work_finished() noexcept;
129 :
130 : private:
131 : std::unique_ptr<select_socket_state> state_;
132 : };
133 :
134 : // Backward compatibility alias
135 : using select_sockets = select_socket_service;
136 :
137 : inline void
138 99 : select_op::canceller::operator()() const noexcept
139 : {
140 99 : op->cancel();
141 99 : }
142 :
143 : inline void
144 MIS 0 : select_connect_op::cancel() noexcept
145 : {
146 0 : if (socket_impl_)
147 0 : socket_impl_->cancel_single_op(*this);
148 : else
149 0 : request_cancel();
150 0 : }
151 :
152 : inline void
153 HIT 99 : select_read_op::cancel() noexcept
154 : {
155 99 : if (socket_impl_)
156 99 : socket_impl_->cancel_single_op(*this);
157 : else
158 MIS 0 : request_cancel();
159 HIT 99 : }
160 :
161 : inline void
162 MIS 0 : select_write_op::cancel() noexcept
163 : {
164 0 : if (socket_impl_)
165 0 : socket_impl_->cancel_single_op(*this);
166 : else
167 0 : request_cancel();
168 0 : }
169 :
170 : inline void
171 HIT 3074 : select_connect_op::operator()()
172 : {
173 3074 : stop_cb.reset();
174 :
175 3074 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176 :
177 : // Cache endpoints on successful connect
178 3074 : if (success && socket_impl_)
179 : {
180 3071 : endpoint local_ep;
181 3071 : sockaddr_storage local_storage{};
182 3071 : socklen_t local_len = sizeof(local_storage);
183 3071 : if (::getsockname(
184 : fd, reinterpret_cast<sockaddr*>(&local_storage),
185 3071 : &local_len) == 0)
186 3071 : local_ep = from_sockaddr(local_storage);
187 3071 : static_cast<select_socket*>(socket_impl_)
188 3071 : ->set_endpoints(local_ep, target_endpoint);
189 : }
190 :
191 3074 : if (ec_out)
192 : {
193 3074 : if (cancelled.load(std::memory_order_acquire))
194 MIS 0 : *ec_out = capy::error::canceled;
195 HIT 3074 : else if (errn != 0)
196 3 : *ec_out = make_err(errn);
197 : else
198 3071 : *ec_out = {};
199 : }
200 :
201 3074 : if (bytes_out)
202 MIS 0 : *bytes_out = bytes_transferred;
203 :
204 : // Move to stack before destroying the frame
205 HIT 3074 : capy::executor_ref saved_ex(ex);
206 3074 : std::coroutine_handle<> saved_h(h);
207 3074 : impl_ptr.reset();
208 3074 : dispatch_coro(saved_ex, saved_h).resume();
209 3074 : }
210 :
211 9240 : inline select_socket::select_socket(select_socket_service& svc) noexcept
212 9240 : : svc_(svc)
213 : {
214 9240 : }
215 :
216 : inline std::coroutine_handle<>
217 3074 : select_socket::connect(
218 : std::coroutine_handle<> h,
219 : capy::executor_ref ex,
220 : endpoint ep,
221 : std::stop_token token,
222 : std::error_code* ec)
223 : {
224 3074 : auto& op = conn_;
225 3074 : op.reset();
226 3074 : op.h = h;
227 3074 : op.ex = ex;
228 3074 : op.ec_out = ec;
229 3074 : op.fd = fd_;
230 3074 : op.target_endpoint = ep; // Store target for endpoint caching
231 3074 : op.start(token, this);
232 :
233 3074 : sockaddr_storage storage{};
234 : socklen_t addrlen =
235 3074 : detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
236 : int result =
237 3074 : ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
238 :
239 3074 : if (result == 0)
240 : {
241 : // Sync success — cache endpoints immediately
242 MIS 0 : sockaddr_storage local_storage{};
243 0 : socklen_t local_len = sizeof(local_storage);
244 0 : if (::getsockname(
245 : fd_, reinterpret_cast<sockaddr*>(&local_storage),
246 0 : &local_len) == 0)
247 0 : local_endpoint_ = detail::from_sockaddr(local_storage);
248 0 : remote_endpoint_ = ep;
249 :
250 0 : op.complete(0, 0);
251 0 : op.impl_ptr = shared_from_this();
252 0 : svc_.post(&op);
253 : // completion is always posted to scheduler queue, never inline.
254 0 : return std::noop_coroutine();
255 : }
256 :
257 HIT 3074 : if (errno == EINPROGRESS)
258 : {
259 3074 : svc_.work_started();
260 3074 : op.impl_ptr = shared_from_this();
261 :
262 : // Set registering BEFORE register_fd to close the race window where
263 : // reactor sees an event before we set registered. The reactor treats
264 : // registering the same as registered when claiming the op.
265 3074 : op.registered.store(
266 : select_registration_state::registering, std::memory_order_release);
267 3074 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268 :
269 : // Transition to registered. If this fails, reactor or cancel already
270 : // claimed the op (state is now unregistered), so we're done. However,
271 : // we must still deregister the fd because cancel's deregister_fd may
272 : // have run before our register_fd, leaving the fd orphaned.
273 3074 : auto expected = select_registration_state::registering;
274 3074 : if (!op.registered.compare_exchange_strong(
275 : expected, select_registration_state::registered,
276 : std::memory_order_acq_rel))
277 : {
278 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279 : // completion is always posted to scheduler queue, never inline.
280 0 : return std::noop_coroutine();
281 : }
282 :
283 : // If cancelled was set before we registered, handle it now.
284 HIT 3074 : if (op.cancelled.load(std::memory_order_acquire))
285 : {
286 MIS 0 : auto prev = op.registered.exchange(
287 : select_registration_state::unregistered,
288 : std::memory_order_acq_rel);
289 0 : if (prev != select_registration_state::unregistered)
290 : {
291 0 : svc_.scheduler().deregister_fd(
292 : fd_, select_scheduler::event_write);
293 0 : op.impl_ptr = shared_from_this();
294 0 : svc_.post(&op);
295 0 : svc_.work_finished();
296 : }
297 : }
298 : // completion is always posted to scheduler queue, never inline.
299 HIT 3074 : return std::noop_coroutine();
300 : }
301 :
302 MIS 0 : op.complete(errno, 0);
303 0 : op.impl_ptr = shared_from_this();
304 0 : svc_.post(&op);
305 : // completion is always posted to scheduler queue, never inline.
306 0 : return std::noop_coroutine();
307 : }
308 :
309 : inline std::coroutine_handle<>
310 HIT 114920 : select_socket::read_some(
311 : std::coroutine_handle<> h,
312 : capy::executor_ref ex,
313 : io_buffer_param param,
314 : std::stop_token token,
315 : std::error_code* ec,
316 : std::size_t* bytes_out)
317 : {
318 114920 : auto& op = rd_;
319 114920 : op.reset();
320 114920 : op.h = h;
321 114920 : op.ex = ex;
322 114920 : op.ec_out = ec;
323 114920 : op.bytes_out = bytes_out;
324 114920 : op.fd = fd_;
325 114920 : op.start(token, this);
326 :
327 114920 : capy::mutable_buffer bufs[select_read_op::max_buffers];
328 114920 : op.iovec_count =
329 114920 : static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330 :
331 114920 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332 : {
333 1 : op.empty_buffer_read = true;
334 1 : op.complete(0, 0);
335 1 : op.impl_ptr = shared_from_this();
336 1 : svc_.post(&op);
337 1 : return std::noop_coroutine();
338 : }
339 :
340 229838 : for (int i = 0; i < op.iovec_count; ++i)
341 : {
342 114919 : op.iovecs[i].iov_base = bufs[i].data();
343 114919 : op.iovecs[i].iov_len = bufs[i].size();
344 : }
345 :
346 114919 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347 :
348 114919 : if (n > 0)
349 : {
350 114631 : op.complete(0, static_cast<std::size_t>(n));
351 114631 : op.impl_ptr = shared_from_this();
352 114631 : svc_.post(&op);
353 114631 : return std::noop_coroutine();
354 : }
355 :
356 288 : if (n == 0)
357 : {
358 5 : op.complete(0, 0);
359 5 : op.impl_ptr = shared_from_this();
360 5 : svc_.post(&op);
361 5 : return std::noop_coroutine();
362 : }
363 :
364 283 : if (errno == EAGAIN || errno == EWOULDBLOCK)
365 : {
366 283 : svc_.work_started();
367 283 : op.impl_ptr = shared_from_this();
368 :
369 : // Set registering BEFORE register_fd to close the race window where
370 : // reactor sees an event before we set registered.
371 283 : op.registered.store(
372 : select_registration_state::registering, std::memory_order_release);
373 283 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374 :
375 : // Transition to registered. If this fails, reactor or cancel already
376 : // claimed the op (state is now unregistered), so we're done. However,
377 : // we must still deregister the fd because cancel's deregister_fd may
378 : // have run before our register_fd, leaving the fd orphaned.
379 283 : auto expected = select_registration_state::registering;
380 283 : if (!op.registered.compare_exchange_strong(
381 : expected, select_registration_state::registered,
382 : std::memory_order_acq_rel))
383 : {
384 MIS 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385 0 : return std::noop_coroutine();
386 : }
387 :
388 : // If cancelled was set before we registered, handle it now.
389 HIT 283 : if (op.cancelled.load(std::memory_order_acquire))
390 : {
391 MIS 0 : auto prev = op.registered.exchange(
392 : select_registration_state::unregistered,
393 : std::memory_order_acq_rel);
394 0 : if (prev != select_registration_state::unregistered)
395 : {
396 0 : svc_.scheduler().deregister_fd(
397 : fd_, select_scheduler::event_read);
398 0 : op.impl_ptr = shared_from_this();
399 0 : svc_.post(&op);
400 0 : svc_.work_finished();
401 : }
402 : }
403 HIT 283 : return std::noop_coroutine();
404 : }
405 :
406 MIS 0 : op.complete(errno, 0);
407 0 : op.impl_ptr = shared_from_this();
408 0 : svc_.post(&op);
409 0 : return std::noop_coroutine();
410 : }
411 :
412 : inline std::coroutine_handle<>
413 HIT 114756 : select_socket::write_some(
414 : std::coroutine_handle<> h,
415 : capy::executor_ref ex,
416 : io_buffer_param param,
417 : std::stop_token token,
418 : std::error_code* ec,
419 : std::size_t* bytes_out)
420 : {
421 114756 : auto& op = wr_;
422 114756 : op.reset();
423 114756 : op.h = h;
424 114756 : op.ex = ex;
425 114756 : op.ec_out = ec;
426 114756 : op.bytes_out = bytes_out;
427 114756 : op.fd = fd_;
428 114756 : op.start(token, this);
429 :
430 114756 : capy::mutable_buffer bufs[select_write_op::max_buffers];
431 114756 : op.iovec_count =
432 114756 : static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433 :
434 114756 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435 : {
436 1 : op.complete(0, 0);
437 1 : op.impl_ptr = shared_from_this();
438 1 : svc_.post(&op);
439 1 : return std::noop_coroutine();
440 : }
441 :
442 229510 : for (int i = 0; i < op.iovec_count; ++i)
443 : {
444 114755 : op.iovecs[i].iov_base = bufs[i].data();
445 114755 : op.iovecs[i].iov_len = bufs[i].size();
446 : }
447 :
448 114755 : msghdr msg{};
449 114755 : msg.msg_iov = op.iovecs;
450 114755 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451 :
452 114755 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453 :
454 114755 : if (n > 0)
455 : {
456 114754 : op.complete(0, static_cast<std::size_t>(n));
457 114754 : op.impl_ptr = shared_from_this();
458 114754 : svc_.post(&op);
459 114754 : return std::noop_coroutine();
460 : }
461 :
462 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
463 : {
464 MIS 0 : svc_.work_started();
465 0 : op.impl_ptr = shared_from_this();
466 :
467 : // Set registering BEFORE register_fd to close the race window where
468 : // reactor sees an event before we set registered.
469 0 : op.registered.store(
470 : select_registration_state::registering, std::memory_order_release);
471 0 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472 :
473 : // Transition to registered. If this fails, reactor or cancel already
474 : // claimed the op (state is now unregistered), so we're done. However,
475 : // we must still deregister the fd because cancel's deregister_fd may
476 : // have run before our register_fd, leaving the fd orphaned.
477 0 : auto expected = select_registration_state::registering;
478 0 : if (!op.registered.compare_exchange_strong(
479 : expected, select_registration_state::registered,
480 : std::memory_order_acq_rel))
481 : {
482 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483 0 : return std::noop_coroutine();
484 : }
485 :
486 : // If cancelled was set before we registered, handle it now.
487 0 : if (op.cancelled.load(std::memory_order_acquire))
488 : {
489 0 : auto prev = op.registered.exchange(
490 : select_registration_state::unregistered,
491 : std::memory_order_acq_rel);
492 0 : if (prev != select_registration_state::unregistered)
493 : {
494 0 : svc_.scheduler().deregister_fd(
495 : fd_, select_scheduler::event_write);
496 0 : op.impl_ptr = shared_from_this();
497 0 : svc_.post(&op);
498 0 : svc_.work_finished();
499 : }
500 : }
501 0 : return std::noop_coroutine();
502 : }
503 :
504 HIT 1 : op.complete(errno ? errno : EIO, 0);
505 1 : op.impl_ptr = shared_from_this();
506 1 : svc_.post(&op);
507 1 : return std::noop_coroutine();
508 : }
509 :
510 : inline std::error_code
511 3 : select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512 : {
513 : int how;
514 3 : switch (what)
515 : {
516 1 : case tcp_socket::shutdown_receive:
517 1 : how = SHUT_RD;
518 1 : break;
519 1 : case tcp_socket::shutdown_send:
520 1 : how = SHUT_WR;
521 1 : break;
522 1 : case tcp_socket::shutdown_both:
523 1 : how = SHUT_RDWR;
524 1 : break;
525 MIS 0 : default:
526 0 : return make_err(EINVAL);
527 : }
528 HIT 3 : if (::shutdown(fd_, how) != 0)
529 MIS 0 : return make_err(errno);
530 HIT 3 : return {};
531 : }
532 :
533 : inline std::error_code
534 28 : select_socket::set_option(
535 : int level, int optname,
536 : void const* data, std::size_t size) noexcept
537 : {
538 28 : if (::setsockopt(fd_, level, optname, data,
539 28 : static_cast<socklen_t>(size)) != 0)
540 MIS 0 : return make_err(errno);
541 HIT 28 : return {};
542 : }
543 :
544 : inline std::error_code
545 31 : select_socket::get_option(
546 : int level, int optname,
547 : void* data, std::size_t* size) const noexcept
548 : {
549 31 : socklen_t len = static_cast<socklen_t>(*size);
550 31 : if (::getsockopt(fd_, level, optname, data, &len) != 0)
551 MIS 0 : return make_err(errno);
552 HIT 31 : *size = static_cast<std::size_t>(len);
553 31 : return {};
554 : }
555 :
556 : inline void
557 177 : select_socket::cancel() noexcept
558 : {
559 177 : auto self = weak_from_this().lock();
560 177 : if (!self)
561 MIS 0 : return;
562 :
563 HIT 531 : auto cancel_op = [this, &self](select_op& op, int events) {
564 531 : auto prev = op.registered.exchange(
565 : select_registration_state::unregistered, std::memory_order_acq_rel);
566 531 : op.request_cancel();
567 531 : if (prev != select_registration_state::unregistered)
568 : {
569 92 : svc_.scheduler().deregister_fd(fd_, events);
570 92 : op.impl_ptr = self;
571 92 : svc_.post(&op);
572 92 : svc_.work_finished();
573 : }
574 708 : };
575 :
576 177 : cancel_op(conn_, select_scheduler::event_write);
577 177 : cancel_op(rd_, select_scheduler::event_read);
578 177 : cancel_op(wr_, select_scheduler::event_write);
579 177 : }
580 :
581 : inline void
582 99 : select_socket::cancel_single_op(select_op& op) noexcept
583 : {
584 99 : auto self = weak_from_this().lock();
585 99 : if (!self)
586 MIS 0 : return;
587 :
588 : // Called from stop_token callback to cancel a specific pending operation.
589 HIT 99 : auto prev = op.registered.exchange(
590 : select_registration_state::unregistered, std::memory_order_acq_rel);
591 99 : op.request_cancel();
592 :
593 99 : if (prev != select_registration_state::unregistered)
594 : {
595 : // Determine which event type to deregister
596 67 : int events = 0;
597 67 : if (&op == &conn_ || &op == &wr_)
598 MIS 0 : events = select_scheduler::event_write;
599 HIT 67 : else if (&op == &rd_)
600 67 : events = select_scheduler::event_read;
601 :
602 67 : svc_.scheduler().deregister_fd(fd_, events);
603 :
604 67 : op.impl_ptr = self;
605 67 : svc_.post(&op);
606 67 : svc_.work_finished();
607 : }
608 99 : }
609 :
610 : inline void
611 27729 : select_socket::close_socket() noexcept
612 : {
613 27729 : auto self = weak_from_this().lock();
614 27729 : if (self)
615 : {
616 83187 : auto cancel_op = [this, &self](select_op& op, int events) {
617 83187 : auto prev = op.registered.exchange(
618 : select_registration_state::unregistered,
619 : std::memory_order_acq_rel);
620 83187 : op.request_cancel();
621 83187 : if (prev != select_registration_state::unregistered)
622 : {
623 1 : svc_.scheduler().deregister_fd(fd_, events);
624 1 : op.impl_ptr = self;
625 1 : svc_.post(&op);
626 1 : svc_.work_finished();
627 : }
628 110916 : };
629 :
630 27729 : cancel_op(conn_, select_scheduler::event_write);
631 27729 : cancel_op(rd_, select_scheduler::event_read);
632 27729 : cancel_op(wr_, select_scheduler::event_write);
633 : }
634 :
635 27729 : if (fd_ >= 0)
636 : {
637 6160 : svc_.scheduler().deregister_fd(
638 : fd_, select_scheduler::event_read | select_scheduler::event_write);
639 6160 : ::close(fd_);
640 6160 : fd_ = -1;
641 : }
642 :
643 27729 : local_endpoint_ = endpoint{};
644 27729 : remote_endpoint_ = endpoint{};
645 27729 : }
646 :
647 154 : inline select_socket_service::select_socket_service(
648 154 : capy::execution_context& ctx)
649 154 : : state_(
650 : std::make_unique<select_socket_state>(
651 154 : ctx.use_service<select_scheduler>()))
652 : {
653 154 : }
654 :
655 308 : inline select_socket_service::~select_socket_service() {}
656 :
657 : inline void
658 154 : select_socket_service::shutdown()
659 : {
660 154 : std::lock_guard lock(state_->mutex_);
661 :
662 154 : while (auto* impl = state_->socket_list_.pop_front())
663 MIS 0 : impl->close_socket();
664 :
665 : // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666 : // drains completed_ops_, calling destroy() on each queued op. Letting
667 : // ~state_ release the ptrs (during service destruction, after scheduler
668 : // shutdown) keeps every impl alive until all ops have been drained.
669 HIT 154 : }
670 :
671 : inline io_object::implementation*
672 9240 : select_socket_service::construct()
673 : {
674 9240 : auto impl = std::make_shared<select_socket>(*this);
675 9240 : auto* raw = impl.get();
676 :
677 : {
678 9240 : std::lock_guard lock(state_->mutex_);
679 9240 : state_->socket_list_.push_back(raw);
680 9240 : state_->socket_ptrs_.emplace(raw, std::move(impl));
681 9240 : }
682 :
683 9240 : return raw;
684 9240 : }
685 :
686 : inline void
687 9240 : select_socket_service::destroy(io_object::implementation* impl)
688 : {
689 9240 : auto* select_impl = static_cast<select_socket*>(impl);
690 9240 : select_impl->close_socket();
691 9240 : std::lock_guard lock(state_->mutex_);
692 9240 : state_->socket_list_.remove(select_impl);
693 9240 : state_->socket_ptrs_.erase(select_impl);
694 9240 : }
695 :
696 : inline std::error_code
697 3089 : select_socket_service::open_socket(
698 : tcp_socket::implementation& impl,
699 : int family, int type, int protocol)
700 : {
701 3089 : auto* select_impl = static_cast<select_socket*>(&impl);
702 3089 : select_impl->close_socket();
703 :
704 3089 : int fd = ::socket(family, type, protocol);
705 3089 : if (fd < 0)
706 MIS 0 : return make_err(errno);
707 :
708 HIT 3089 : if (family == AF_INET6)
709 : {
710 5 : int one = 1;
711 5 : ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
712 : }
713 :
714 : // Set non-blocking and close-on-exec
715 3089 : int flags = ::fcntl(fd, F_GETFL, 0);
716 3089 : if (flags == -1)
717 : {
718 MIS 0 : int errn = errno;
719 0 : ::close(fd);
720 0 : return make_err(errn);
721 : }
722 HIT 3089 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723 : {
724 MIS 0 : int errn = errno;
725 0 : ::close(fd);
726 0 : return make_err(errn);
727 : }
728 HIT 3089 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729 : {
730 MIS 0 : int errn = errno;
731 0 : ::close(fd);
732 0 : return make_err(errn);
733 : }
734 :
735 : // Check fd is within select() limits
736 HIT 3089 : if (fd >= FD_SETSIZE)
737 : {
738 MIS 0 : ::close(fd);
739 0 : return make_err(EMFILE); // Too many open files
740 : }
741 :
742 HIT 3089 : select_impl->fd_ = fd;
743 3089 : return {};
744 : }
745 :
746 : inline void
747 15400 : select_socket_service::close(io_object::handle& h)
748 : {
749 15400 : static_cast<select_socket*>(h.get())->close_socket();
750 15400 : }
751 :
752 : inline void
753 229553 : select_socket_service::post(select_op* op)
754 : {
755 229553 : state_->sched_.post(op);
756 229553 : }
757 :
758 : inline void
759 3357 : select_socket_service::work_started() noexcept
760 : {
761 3357 : state_->sched_.work_started();
762 3357 : }
763 :
764 : inline void
765 160 : select_socket_service::work_finished() noexcept
766 : {
767 160 : state_->sched_.work_finished();
768 160 : }
769 :
770 : } // namespace boost::corosio::detail
771 :
772 : #endif // BOOST_COROSIO_HAS_SELECT
773 :
774 : #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
|