src/corosio/src/tcp_server.cpp

64.6% Lines (42/65) 84.6% Functions (11/13)
src/corosio/src/tcp_server.cpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Steve Gerbino
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/cppalliance/corosio
9 //
10
11 #include <boost/corosio/tcp_server.hpp>
12 #include <boost/corosio/detail/except.hpp>
13 #include <condition_variable>
14 #include <mutex>
15 #include <utility>
16
17 namespace boost::corosio {
18
19 struct tcp_server::impl
20 {
21 std::mutex join_mutex;
22 std::condition_variable join_cv;
23 capy::execution_context& ctx;
24 std::vector<tcp_acceptor> ports;
25 std::stop_source stop;
26
27 9 explicit impl(capy::execution_context& c) noexcept : ctx(c) {}
28 };
29
30 tcp_server::impl*
31 9 tcp_server::make_impl(capy::execution_context& ctx)
32 {
33 9 return new impl(ctx);
34 }
35
36 9 tcp_server::~tcp_server()
37 {
38 9 delete impl_;
39 9 }
40
41 tcp_server::tcp_server(tcp_server&& o) noexcept
42 : impl_(std::exchange(o.impl_, nullptr))
43 , ex_(o.ex_)
44 , waiters_(std::exchange(o.waiters_, nullptr))
45 , idle_head_(std::exchange(o.idle_head_, nullptr))
46 , active_head_(std::exchange(o.active_head_, nullptr))
47 , active_tail_(std::exchange(o.active_tail_, nullptr))
48 , active_accepts_(std::exchange(o.active_accepts_, 0))
49 , storage_(std::move(o.storage_))
50 , running_(std::exchange(o.running_, false))
51 {
52 }
53
54 tcp_server&
55 tcp_server::operator=(tcp_server&& o) noexcept
56 {
57 delete impl_;
58 impl_ = std::exchange(o.impl_, nullptr);
59 ex_ = o.ex_;
60 waiters_ = std::exchange(o.waiters_, nullptr);
61 idle_head_ = std::exchange(o.idle_head_, nullptr);
62 active_head_ = std::exchange(o.active_head_, nullptr);
63 active_tail_ = std::exchange(o.active_tail_, nullptr);
64 active_accepts_ = std::exchange(o.active_accepts_, 0);
65 storage_ = std::move(o.storage_);
66 running_ = std::exchange(o.running_, false);
67 return *this;
68 }
69
70 // Accept loop: wait for idle worker, accept connection, dispatch
71 capy::task<void>
72 8 tcp_server::do_accept(tcp_acceptor& acc)
73 {
74 // Analyzer can't trace value through coroutine await_transform
75 // NOLINTNEXTLINE(clang-analyzer-core.uninitialized.UndefReturn)
76 auto env = co_await capy::this_coro::environment;
77 while (!env->stop_token.stop_requested())
78 {
79 // Wait for an idle worker before blocking on accept
80 auto& w = co_await pop();
81 auto [ec] = co_await acc.accept(w.socket());
82 if (ec)
83 {
84 co_await push(w);
85 continue;
86 }
87 w.run(launcher{*this, w});
88 }
89 16 }
90
91 std::error_code
92 9 tcp_server::bind(endpoint ep)
93 {
94 try
95 {
96 9 impl_->ports.emplace_back(impl_->ctx, ep);
97 8 return {};
98 }
99 1 catch (std::system_error const& e)
100 {
101 1 return e.code();
102 1 }
103 }
104
105 void
106 10 tcp_server::start()
107 {
108 // Idempotent - only start if not already running
109 10 if (running_)
110 1 return;
111
112 // Previous session must be fully stopped before restart
113 9 if (active_accepts_ != 0)
114 1 detail::throw_logic_error(
115 "tcp_server::start: previous session not joined");
116
117 8 running_ = true;
118
119 8 impl_->stop = {}; // Fresh stop source
120 8 auto st = impl_->stop.get_token();
121
122 8 active_accepts_ = impl_->ports.size();
123
124 // Launch with completion handler that decrements counter
125 16 for (auto& t : impl_->ports)
126 16 capy::run_async(ex_, st, [this]() {
127 8 std::lock_guard lock(impl_->join_mutex);
128 8 if (--active_accepts_ == 0)
129 8 impl_->join_cv.notify_all();
130 16 })(do_accept(t));
131 8 }
132
133 void
134 10 tcp_server::stop()
135 {
136 // Idempotent - only stop if running
137 10 if (!running_)
138 2 return;
139 8 running_ = false;
140
141 // Stop accept loops
142 8 impl_->stop.request_stop();
143
144 // Launch cancellation coroutine on server executor
145 8 capy::run_async(ex_, std::stop_token{})(do_stop());
146 }
147
148 void
149 4 tcp_server::join()
150 {
151 4 std::unique_lock lock(impl_->join_mutex);
152 8 impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
153 4 }
154
155 capy::task<>
156 8 tcp_server::do_stop()
157 {
158 // Running on server executor - safe to iterate active list
159 // Just cancel, don't modify list - workers return themselves when done
160 for (auto* w = active_head_; w; w = w->next_)
161 w->stop_.request_stop();
162 co_return;
163 16 }
164
165 } // namespace boost::corosio
166