include/boost/corosio/native/detail/select/select_socket_service.hpp

75.8% Lines (263/347) 93.1% Functions (27/29)
include/boost/corosio/native/detail/select/select_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 154 explicit select_socket_state(select_scheduler& sched) noexcept
88 154 : sched_(sched)
89 {
90 154 }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code
119 open_socket(tcp_socket::implementation& impl,
120 int family, int type, int protocol) override;
121
122 9677 select_scheduler& scheduler() const noexcept
123 {
124 9677 return state_->sched_;
125 }
126 void post(select_op* op);
127 void work_started() noexcept;
128 void work_finished() noexcept;
129
130 private:
131 std::unique_ptr<select_socket_state> state_;
132 };
133
134 // Backward compatibility alias
135 using select_sockets = select_socket_service;
136
137 inline void
138 99 select_op::canceller::operator()() const noexcept
139 {
140 99 op->cancel();
141 99 }
142
143 inline void
144 select_connect_op::cancel() noexcept
145 {
146 if (socket_impl_)
147 socket_impl_->cancel_single_op(*this);
148 else
149 request_cancel();
150 }
151
152 inline void
153 99 select_read_op::cancel() noexcept
154 {
155 99 if (socket_impl_)
156 99 socket_impl_->cancel_single_op(*this);
157 else
158 request_cancel();
159 99 }
160
161 inline void
162 select_write_op::cancel() noexcept
163 {
164 if (socket_impl_)
165 socket_impl_->cancel_single_op(*this);
166 else
167 request_cancel();
168 }
169
170 inline void
171 3074 select_connect_op::operator()()
172 {
173 3074 stop_cb.reset();
174
175 3074 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
176
177 // Cache endpoints on successful connect
178 3074 if (success && socket_impl_)
179 {
180 3071 endpoint local_ep;
181 3071 sockaddr_storage local_storage{};
182 3071 socklen_t local_len = sizeof(local_storage);
183 3071 if (::getsockname(
184 fd, reinterpret_cast<sockaddr*>(&local_storage),
185 3071 &local_len) == 0)
186 3071 local_ep = from_sockaddr(local_storage);
187 3071 static_cast<select_socket*>(socket_impl_)
188 3071 ->set_endpoints(local_ep, target_endpoint);
189 }
190
191 3074 if (ec_out)
192 {
193 3074 if (cancelled.load(std::memory_order_acquire))
194 *ec_out = capy::error::canceled;
195 3074 else if (errn != 0)
196 3 *ec_out = make_err(errn);
197 else
198 3071 *ec_out = {};
199 }
200
201 3074 if (bytes_out)
202 *bytes_out = bytes_transferred;
203
204 // Move to stack before destroying the frame
205 3074 capy::executor_ref saved_ex(ex);
206 3074 std::coroutine_handle<> saved_h(h);
207 3074 impl_ptr.reset();
208 3074 dispatch_coro(saved_ex, saved_h).resume();
209 3074 }
210
211 9240 inline select_socket::select_socket(select_socket_service& svc) noexcept
212 9240 : svc_(svc)
213 {
214 9240 }
215
216 inline std::coroutine_handle<>
217 3074 select_socket::connect(
218 std::coroutine_handle<> h,
219 capy::executor_ref ex,
220 endpoint ep,
221 std::stop_token token,
222 std::error_code* ec)
223 {
224 3074 auto& op = conn_;
225 3074 op.reset();
226 3074 op.h = h;
227 3074 op.ex = ex;
228 3074 op.ec_out = ec;
229 3074 op.fd = fd_;
230 3074 op.target_endpoint = ep; // Store target for endpoint caching
231 3074 op.start(token, this);
232
233 3074 sockaddr_storage storage{};
234 socklen_t addrlen =
235 3074 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
236 int result =
237 3074 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
238
239 3074 if (result == 0)
240 {
241 // Sync success — cache endpoints immediately
242 sockaddr_storage local_storage{};
243 socklen_t local_len = sizeof(local_storage);
244 if (::getsockname(
245 fd_, reinterpret_cast<sockaddr*>(&local_storage),
246 &local_len) == 0)
247 local_endpoint_ = detail::from_sockaddr(local_storage);
248 remote_endpoint_ = ep;
249
250 op.complete(0, 0);
251 op.impl_ptr = shared_from_this();
252 svc_.post(&op);
253 // completion is always posted to scheduler queue, never inline.
254 return std::noop_coroutine();
255 }
256
257 3074 if (errno == EINPROGRESS)
258 {
259 3074 svc_.work_started();
260 3074 op.impl_ptr = shared_from_this();
261
262 // Set registering BEFORE register_fd to close the race window where
263 // reactor sees an event before we set registered. The reactor treats
264 // registering the same as registered when claiming the op.
265 3074 op.registered.store(
266 select_registration_state::registering, std::memory_order_release);
267 3074 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
268
269 // Transition to registered. If this fails, reactor or cancel already
270 // claimed the op (state is now unregistered), so we're done. However,
271 // we must still deregister the fd because cancel's deregister_fd may
272 // have run before our register_fd, leaving the fd orphaned.
273 3074 auto expected = select_registration_state::registering;
274 3074 if (!op.registered.compare_exchange_strong(
275 expected, select_registration_state::registered,
276 std::memory_order_acq_rel))
277 {
278 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
279 // completion is always posted to scheduler queue, never inline.
280 return std::noop_coroutine();
281 }
282
283 // If cancelled was set before we registered, handle it now.
284 3074 if (op.cancelled.load(std::memory_order_acquire))
285 {
286 auto prev = op.registered.exchange(
287 select_registration_state::unregistered,
288 std::memory_order_acq_rel);
289 if (prev != select_registration_state::unregistered)
290 {
291 svc_.scheduler().deregister_fd(
292 fd_, select_scheduler::event_write);
293 op.impl_ptr = shared_from_this();
294 svc_.post(&op);
295 svc_.work_finished();
296 }
297 }
298 // completion is always posted to scheduler queue, never inline.
299 3074 return std::noop_coroutine();
300 }
301
302 op.complete(errno, 0);
303 op.impl_ptr = shared_from_this();
304 svc_.post(&op);
305 // completion is always posted to scheduler queue, never inline.
306 return std::noop_coroutine();
307 }
308
309 inline std::coroutine_handle<>
310 114920 select_socket::read_some(
311 std::coroutine_handle<> h,
312 capy::executor_ref ex,
313 io_buffer_param param,
314 std::stop_token token,
315 std::error_code* ec,
316 std::size_t* bytes_out)
317 {
318 114920 auto& op = rd_;
319 114920 op.reset();
320 114920 op.h = h;
321 114920 op.ex = ex;
322 114920 op.ec_out = ec;
323 114920 op.bytes_out = bytes_out;
324 114920 op.fd = fd_;
325 114920 op.start(token, this);
326
327 114920 capy::mutable_buffer bufs[select_read_op::max_buffers];
328 114920 op.iovec_count =
329 114920 static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
330
331 114920 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
332 {
333 1 op.empty_buffer_read = true;
334 1 op.complete(0, 0);
335 1 op.impl_ptr = shared_from_this();
336 1 svc_.post(&op);
337 1 return std::noop_coroutine();
338 }
339
340 229838 for (int i = 0; i < op.iovec_count; ++i)
341 {
342 114919 op.iovecs[i].iov_base = bufs[i].data();
343 114919 op.iovecs[i].iov_len = bufs[i].size();
344 }
345
346 114919 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
347
348 114919 if (n > 0)
349 {
350 114631 op.complete(0, static_cast<std::size_t>(n));
351 114631 op.impl_ptr = shared_from_this();
352 114631 svc_.post(&op);
353 114631 return std::noop_coroutine();
354 }
355
356 288 if (n == 0)
357 {
358 5 op.complete(0, 0);
359 5 op.impl_ptr = shared_from_this();
360 5 svc_.post(&op);
361 5 return std::noop_coroutine();
362 }
363
364 283 if (errno == EAGAIN || errno == EWOULDBLOCK)
365 {
366 283 svc_.work_started();
367 283 op.impl_ptr = shared_from_this();
368
369 // Set registering BEFORE register_fd to close the race window where
370 // reactor sees an event before we set registered.
371 283 op.registered.store(
372 select_registration_state::registering, std::memory_order_release);
373 283 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
374
375 // Transition to registered. If this fails, reactor or cancel already
376 // claimed the op (state is now unregistered), so we're done. However,
377 // we must still deregister the fd because cancel's deregister_fd may
378 // have run before our register_fd, leaving the fd orphaned.
379 283 auto expected = select_registration_state::registering;
380 283 if (!op.registered.compare_exchange_strong(
381 expected, select_registration_state::registered,
382 std::memory_order_acq_rel))
383 {
384 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
385 return std::noop_coroutine();
386 }
387
388 // If cancelled was set before we registered, handle it now.
389 283 if (op.cancelled.load(std::memory_order_acquire))
390 {
391 auto prev = op.registered.exchange(
392 select_registration_state::unregistered,
393 std::memory_order_acq_rel);
394 if (prev != select_registration_state::unregistered)
395 {
396 svc_.scheduler().deregister_fd(
397 fd_, select_scheduler::event_read);
398 op.impl_ptr = shared_from_this();
399 svc_.post(&op);
400 svc_.work_finished();
401 }
402 }
403 283 return std::noop_coroutine();
404 }
405
406 op.complete(errno, 0);
407 op.impl_ptr = shared_from_this();
408 svc_.post(&op);
409 return std::noop_coroutine();
410 }
411
412 inline std::coroutine_handle<>
413 114756 select_socket::write_some(
414 std::coroutine_handle<> h,
415 capy::executor_ref ex,
416 io_buffer_param param,
417 std::stop_token token,
418 std::error_code* ec,
419 std::size_t* bytes_out)
420 {
421 114756 auto& op = wr_;
422 114756 op.reset();
423 114756 op.h = h;
424 114756 op.ex = ex;
425 114756 op.ec_out = ec;
426 114756 op.bytes_out = bytes_out;
427 114756 op.fd = fd_;
428 114756 op.start(token, this);
429
430 114756 capy::mutable_buffer bufs[select_write_op::max_buffers];
431 114756 op.iovec_count =
432 114756 static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
433
434 114756 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
435 {
436 1 op.complete(0, 0);
437 1 op.impl_ptr = shared_from_this();
438 1 svc_.post(&op);
439 1 return std::noop_coroutine();
440 }
441
442 229510 for (int i = 0; i < op.iovec_count; ++i)
443 {
444 114755 op.iovecs[i].iov_base = bufs[i].data();
445 114755 op.iovecs[i].iov_len = bufs[i].size();
446 }
447
448 114755 msghdr msg{};
449 114755 msg.msg_iov = op.iovecs;
450 114755 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
451
452 114755 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
453
454 114755 if (n > 0)
455 {
456 114754 op.complete(0, static_cast<std::size_t>(n));
457 114754 op.impl_ptr = shared_from_this();
458 114754 svc_.post(&op);
459 114754 return std::noop_coroutine();
460 }
461
462 1 if (errno == EAGAIN || errno == EWOULDBLOCK)
463 {
464 svc_.work_started();
465 op.impl_ptr = shared_from_this();
466
467 // Set registering BEFORE register_fd to close the race window where
468 // reactor sees an event before we set registered.
469 op.registered.store(
470 select_registration_state::registering, std::memory_order_release);
471 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
472
473 // Transition to registered. If this fails, reactor or cancel already
474 // claimed the op (state is now unregistered), so we're done. However,
475 // we must still deregister the fd because cancel's deregister_fd may
476 // have run before our register_fd, leaving the fd orphaned.
477 auto expected = select_registration_state::registering;
478 if (!op.registered.compare_exchange_strong(
479 expected, select_registration_state::registered,
480 std::memory_order_acq_rel))
481 {
482 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
483 return std::noop_coroutine();
484 }
485
486 // If cancelled was set before we registered, handle it now.
487 if (op.cancelled.load(std::memory_order_acquire))
488 {
489 auto prev = op.registered.exchange(
490 select_registration_state::unregistered,
491 std::memory_order_acq_rel);
492 if (prev != select_registration_state::unregistered)
493 {
494 svc_.scheduler().deregister_fd(
495 fd_, select_scheduler::event_write);
496 op.impl_ptr = shared_from_this();
497 svc_.post(&op);
498 svc_.work_finished();
499 }
500 }
501 return std::noop_coroutine();
502 }
503
504 1 op.complete(errno ? errno : EIO, 0);
505 1 op.impl_ptr = shared_from_this();
506 1 svc_.post(&op);
507 1 return std::noop_coroutine();
508 }
509
510 inline std::error_code
511 3 select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
512 {
513 int how;
514 3 switch (what)
515 {
516 1 case tcp_socket::shutdown_receive:
517 1 how = SHUT_RD;
518 1 break;
519 1 case tcp_socket::shutdown_send:
520 1 how = SHUT_WR;
521 1 break;
522 1 case tcp_socket::shutdown_both:
523 1 how = SHUT_RDWR;
524 1 break;
525 default:
526 return make_err(EINVAL);
527 }
528 3 if (::shutdown(fd_, how) != 0)
529 return make_err(errno);
530 3 return {};
531 }
532
533 inline std::error_code
534 28 select_socket::set_option(
535 int level, int optname,
536 void const* data, std::size_t size) noexcept
537 {
538 28 if (::setsockopt(fd_, level, optname, data,
539 28 static_cast<socklen_t>(size)) != 0)
540 return make_err(errno);
541 28 return {};
542 }
543
544 inline std::error_code
545 31 select_socket::get_option(
546 int level, int optname,
547 void* data, std::size_t* size) const noexcept
548 {
549 31 socklen_t len = static_cast<socklen_t>(*size);
550 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
551 return make_err(errno);
552 31 *size = static_cast<std::size_t>(len);
553 31 return {};
554 }
555
556 inline void
557 177 select_socket::cancel() noexcept
558 {
559 177 auto self = weak_from_this().lock();
560 177 if (!self)
561 return;
562
563 531 auto cancel_op = [this, &self](select_op& op, int events) {
564 531 auto prev = op.registered.exchange(
565 select_registration_state::unregistered, std::memory_order_acq_rel);
566 531 op.request_cancel();
567 531 if (prev != select_registration_state::unregistered)
568 {
569 92 svc_.scheduler().deregister_fd(fd_, events);
570 92 op.impl_ptr = self;
571 92 svc_.post(&op);
572 92 svc_.work_finished();
573 }
574 708 };
575
576 177 cancel_op(conn_, select_scheduler::event_write);
577 177 cancel_op(rd_, select_scheduler::event_read);
578 177 cancel_op(wr_, select_scheduler::event_write);
579 177 }
580
581 inline void
582 99 select_socket::cancel_single_op(select_op& op) noexcept
583 {
584 99 auto self = weak_from_this().lock();
585 99 if (!self)
586 return;
587
588 // Called from stop_token callback to cancel a specific pending operation.
589 99 auto prev = op.registered.exchange(
590 select_registration_state::unregistered, std::memory_order_acq_rel);
591 99 op.request_cancel();
592
593 99 if (prev != select_registration_state::unregistered)
594 {
595 // Determine which event type to deregister
596 67 int events = 0;
597 67 if (&op == &conn_ || &op == &wr_)
598 events = select_scheduler::event_write;
599 67 else if (&op == &rd_)
600 67 events = select_scheduler::event_read;
601
602 67 svc_.scheduler().deregister_fd(fd_, events);
603
604 67 op.impl_ptr = self;
605 67 svc_.post(&op);
606 67 svc_.work_finished();
607 }
608 99 }
609
610 inline void
611 27729 select_socket::close_socket() noexcept
612 {
613 27729 auto self = weak_from_this().lock();
614 27729 if (self)
615 {
616 83187 auto cancel_op = [this, &self](select_op& op, int events) {
617 83187 auto prev = op.registered.exchange(
618 select_registration_state::unregistered,
619 std::memory_order_acq_rel);
620 83187 op.request_cancel();
621 83187 if (prev != select_registration_state::unregistered)
622 {
623 1 svc_.scheduler().deregister_fd(fd_, events);
624 1 op.impl_ptr = self;
625 1 svc_.post(&op);
626 1 svc_.work_finished();
627 }
628 110916 };
629
630 27729 cancel_op(conn_, select_scheduler::event_write);
631 27729 cancel_op(rd_, select_scheduler::event_read);
632 27729 cancel_op(wr_, select_scheduler::event_write);
633 }
634
635 27729 if (fd_ >= 0)
636 {
637 6160 svc_.scheduler().deregister_fd(
638 fd_, select_scheduler::event_read | select_scheduler::event_write);
639 6160 ::close(fd_);
640 6160 fd_ = -1;
641 }
642
643 27729 local_endpoint_ = endpoint{};
644 27729 remote_endpoint_ = endpoint{};
645 27729 }
646
647 154 inline select_socket_service::select_socket_service(
648 154 capy::execution_context& ctx)
649 154 : state_(
650 std::make_unique<select_socket_state>(
651 154 ctx.use_service<select_scheduler>()))
652 {
653 154 }
654
655 308 inline select_socket_service::~select_socket_service() {}
656
657 inline void
658 154 select_socket_service::shutdown()
659 {
660 154 std::lock_guard lock(state_->mutex_);
661
662 154 while (auto* impl = state_->socket_list_.pop_front())
663 impl->close_socket();
664
665 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
666 // drains completed_ops_, calling destroy() on each queued op. Letting
667 // ~state_ release the ptrs (during service destruction, after scheduler
668 // shutdown) keeps every impl alive until all ops have been drained.
669 154 }
670
671 inline io_object::implementation*
672 9240 select_socket_service::construct()
673 {
674 9240 auto impl = std::make_shared<select_socket>(*this);
675 9240 auto* raw = impl.get();
676
677 {
678 9240 std::lock_guard lock(state_->mutex_);
679 9240 state_->socket_list_.push_back(raw);
680 9240 state_->socket_ptrs_.emplace(raw, std::move(impl));
681 9240 }
682
683 9240 return raw;
684 9240 }
685
686 inline void
687 9240 select_socket_service::destroy(io_object::implementation* impl)
688 {
689 9240 auto* select_impl = static_cast<select_socket*>(impl);
690 9240 select_impl->close_socket();
691 9240 std::lock_guard lock(state_->mutex_);
692 9240 state_->socket_list_.remove(select_impl);
693 9240 state_->socket_ptrs_.erase(select_impl);
694 9240 }
695
696 inline std::error_code
697 3089 select_socket_service::open_socket(
698 tcp_socket::implementation& impl,
699 int family, int type, int protocol)
700 {
701 3089 auto* select_impl = static_cast<select_socket*>(&impl);
702 3089 select_impl->close_socket();
703
704 3089 int fd = ::socket(family, type, protocol);
705 3089 if (fd < 0)
706 return make_err(errno);
707
708 3089 if (family == AF_INET6)
709 {
710 5 int one = 1;
711 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
712 }
713
714 // Set non-blocking and close-on-exec
715 3089 int flags = ::fcntl(fd, F_GETFL, 0);
716 3089 if (flags == -1)
717 {
718 int errn = errno;
719 ::close(fd);
720 return make_err(errn);
721 }
722 3089 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
723 {
724 int errn = errno;
725 ::close(fd);
726 return make_err(errn);
727 }
728 3089 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
729 {
730 int errn = errno;
731 ::close(fd);
732 return make_err(errn);
733 }
734
735 // Check fd is within select() limits
736 3089 if (fd >= FD_SETSIZE)
737 {
738 ::close(fd);
739 return make_err(EMFILE); // Too many open files
740 }
741
742 3089 select_impl->fd_ = fd;
743 3089 return {};
744 }
745
746 inline void
747 15400 select_socket_service::close(io_object::handle& h)
748 {
749 15400 static_cast<select_socket*>(h.get())->close_socket();
750 15400 }
751
752 inline void
753 229553 select_socket_service::post(select_op* op)
754 {
755 229553 state_->sched_.post(op);
756 229553 }
757
758 inline void
759 3357 select_socket_service::work_started() noexcept
760 {
761 3357 state_->sched_.work_started();
762 3357 }
763
764 inline void
765 160 select_socket_service::work_finished() noexcept
766 {
767 160 state_->sched_.work_finished();
768 160 }
769
770 } // namespace boost::corosio::detail
771
772 #endif // BOOST_COROSIO_HAS_SELECT
773
774 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
775