78.62% Lines (114/145) 91.67% Functions (33/36)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_TCP_SERVER_HPP 10   #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11   #define BOOST_COROSIO_TCP_SERVER_HPP 11   #define BOOST_COROSIO_TCP_SERVER_HPP
12   12  
13   #include <boost/corosio/detail/config.hpp> 13   #include <boost/corosio/detail/config.hpp>
14   #include <boost/corosio/detail/except.hpp> 14   #include <boost/corosio/detail/except.hpp>
15   #include <boost/corosio/tcp_acceptor.hpp> 15   #include <boost/corosio/tcp_acceptor.hpp>
16   #include <boost/corosio/tcp_socket.hpp> 16   #include <boost/corosio/tcp_socket.hpp>
17   #include <boost/corosio/io_context.hpp> 17   #include <boost/corosio/io_context.hpp>
18   #include <boost/corosio/endpoint.hpp> 18   #include <boost/corosio/endpoint.hpp>
19   #include <boost/capy/task.hpp> 19   #include <boost/capy/task.hpp>
20   #include <boost/capy/concept/execution_context.hpp> 20   #include <boost/capy/concept/execution_context.hpp>
21   #include <boost/capy/concept/io_awaitable.hpp> 21   #include <boost/capy/concept/io_awaitable.hpp>
22   #include <boost/capy/concept/executor.hpp> 22   #include <boost/capy/concept/executor.hpp>
23   #include <boost/capy/ex/any_executor.hpp> 23   #include <boost/capy/ex/any_executor.hpp>
24   #include <boost/capy/ex/frame_allocator.hpp> 24   #include <boost/capy/ex/frame_allocator.hpp>
25   #include <boost/capy/ex/io_env.hpp> 25   #include <boost/capy/ex/io_env.hpp>
26   #include <boost/capy/ex/run_async.hpp> 26   #include <boost/capy/ex/run_async.hpp>
27   27  
28   #include <coroutine> 28   #include <coroutine>
29   #include <memory> 29   #include <memory>
30   #include <ranges> 30   #include <ranges>
31   #include <vector> 31   #include <vector>
32   32  
33   namespace boost::corosio { 33   namespace boost::corosio {
34   34  
35   #ifdef _MSC_VER 35   #ifdef _MSC_VER
36   #pragma warning(push) 36   #pragma warning(push)
37   #pragma warning(disable : 4251) // class needs to have dll-interface 37   #pragma warning(disable : 4251) // class needs to have dll-interface
38   #endif 38   #endif
39   39  
40   /** TCP server with pooled workers. 40   /** TCP server with pooled workers.
41   41  
42   This class manages a pool of reusable worker objects that handle 42   This class manages a pool of reusable worker objects that handle
43   incoming connections. When a connection arrives, an idle worker 43   incoming connections. When a connection arrives, an idle worker
44   is dispatched to handle it. After the connection completes, the 44   is dispatched to handle it. After the connection completes, the
45   worker returns to the pool for reuse, avoiding allocation overhead 45   worker returns to the pool for reuse, avoiding allocation overhead
46   per connection. 46   per connection.
47   47  
48   Workers are set via @ref set_workers as a forward range of 48   Workers are set via @ref set_workers as a forward range of
49   pointer-like objects (e.g., `unique_ptr<worker_base>`). The server 49   pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50   takes ownership of the container via type erasure. 50   takes ownership of the container via type erasure.
51   51  
52   @par Thread Safety 52   @par Thread Safety
53   Distinct objects: Safe. 53   Distinct objects: Safe.
54   Shared objects: Unsafe. 54   Shared objects: Unsafe.
55   55  
56   @par Lifecycle 56   @par Lifecycle
57   The server operates in three states: 57   The server operates in three states:
58   58  
59   - **Stopped**: Initial state, or after @ref join completes. 59   - **Stopped**: Initial state, or after @ref join completes.
60   - **Running**: After @ref start, actively accepting connections. 60   - **Running**: After @ref start, actively accepting connections.
61   - **Stopping**: After @ref stop, draining active work. 61   - **Stopping**: After @ref stop, draining active work.
62   62  
63   State transitions: 63   State transitions:
64   @code 64   @code
65   [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped] 65   [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66   @endcode 66   @endcode
67   67  
68   @par Running the Server 68   @par Running the Server
69   @code 69   @code
70   io_context ioc; 70   io_context ioc;
71   tcp_server srv(ioc, ioc.get_executor()); 71   tcp_server srv(ioc, ioc.get_executor());
72   srv.set_workers(make_workers(ioc, 100)); 72   srv.set_workers(make_workers(ioc, 100));
73   srv.bind(endpoint{address_v4::any(), 8080}); 73   srv.bind(endpoint{address_v4::any(), 8080});
74   srv.start(); 74   srv.start();
75   ioc.run(); // Blocks until all work completes 75   ioc.run(); // Blocks until all work completes
76   @endcode 76   @endcode
77   77  
78   @par Graceful Shutdown 78   @par Graceful Shutdown
79   To shut down gracefully, call @ref stop then drain the io_context: 79   To shut down gracefully, call @ref stop then drain the io_context:
80   @code 80   @code
81   // From a signal handler or timer callback: 81   // From a signal handler or timer callback:
82   srv.stop(); 82   srv.stop();
83   83  
84   // ioc.run() returns after pending work drains. 84   // ioc.run() returns after pending work drains.
85   // Then from the thread that called ioc.run(): 85   // Then from the thread that called ioc.run():
86   srv.join(); // Wait for accept loops to finish 86   srv.join(); // Wait for accept loops to finish
87   @endcode 87   @endcode
88   88  
89   @par Restart After Stop 89   @par Restart After Stop
90   The server can be restarted after a complete shutdown cycle. 90   The server can be restarted after a complete shutdown cycle.
91   You must drain the io_context and call @ref join before restarting: 91   You must drain the io_context and call @ref join before restarting:
92   @code 92   @code
93   srv.start(); 93   srv.start();
94   ioc.run_for( 10s ); // Run for a while 94   ioc.run_for( 10s ); // Run for a while
95   srv.stop(); // Signal shutdown 95   srv.stop(); // Signal shutdown
96   ioc.run(); // REQUIRED: drain pending completions 96   ioc.run(); // REQUIRED: drain pending completions
97   srv.join(); // REQUIRED: wait for accept loops 97   srv.join(); // REQUIRED: wait for accept loops
98   98  
99   // Now safe to restart 99   // Now safe to restart
100   srv.start(); 100   srv.start();
101   ioc.run(); 101   ioc.run();
102   @endcode 102   @endcode
103   103  
104   @par WARNING: What NOT to Do 104   @par WARNING: What NOT to Do
105   - Do NOT call @ref join from inside a worker coroutine (deadlock). 105   - Do NOT call @ref join from inside a worker coroutine (deadlock).
106   - Do NOT call @ref join from a thread running `ioc.run()` (deadlock). 106   - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107   - Do NOT call @ref start without completing @ref join after @ref stop. 107   - Do NOT call @ref start without completing @ref join after @ref stop.
108   - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead. 108   - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109   109  
110   @par Example 110   @par Example
111   @code 111   @code
112   class my_worker : public tcp_server::worker_base 112   class my_worker : public tcp_server::worker_base
113   { 113   {
114   corosio::tcp_socket sock_; 114   corosio::tcp_socket sock_;
115   capy::any_executor ex_; 115   capy::any_executor ex_;
116   public: 116   public:
117   my_worker(io_context& ctx) 117   my_worker(io_context& ctx)
118   : sock_(ctx) 118   : sock_(ctx)
119   , ex_(ctx.get_executor()) 119   , ex_(ctx.get_executor())
120   { 120   {
121   } 121   }
122   122  
123   corosio::tcp_socket& socket() override { return sock_; } 123   corosio::tcp_socket& socket() override { return sock_; }
124   124  
125   void run(launcher launch) override 125   void run(launcher launch) override
126   { 126   {
127   launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<> 127   launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128   { 128   {
129   // handle connection using sock 129   // handle connection using sock
130   co_return; 130   co_return;
131   }(&sock_)); 131   }(&sock_));
132   } 132   }
133   }; 133   };
134   134  
135   auto make_workers(io_context& ctx, int n) 135   auto make_workers(io_context& ctx, int n)
136   { 136   {
137   std::vector<std::unique_ptr<tcp_server::worker_base>> v; 137   std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138   v.reserve(n); 138   v.reserve(n);
139   for(int i = 0; i < n; ++i) 139   for(int i = 0; i < n; ++i)
140   v.push_back(std::make_unique<my_worker>(ctx)); 140   v.push_back(std::make_unique<my_worker>(ctx));
141   return v; 141   return v;
142   } 142   }
143   143  
144   io_context ioc; 144   io_context ioc;
145   tcp_server srv(ioc, ioc.get_executor()); 145   tcp_server srv(ioc, ioc.get_executor());
146   srv.set_workers(make_workers(ioc, 100)); 146   srv.set_workers(make_workers(ioc, 100));
147   @endcode 147   @endcode
148   148  
149   @see worker_base, set_workers, launcher 149   @see worker_base, set_workers, launcher
150   */ 150   */
151   class BOOST_COROSIO_DECL tcp_server 151   class BOOST_COROSIO_DECL tcp_server
152   { 152   {
153   public: 153   public:
154   class worker_base; ///< Abstract base for connection handlers. 154   class worker_base; ///< Abstract base for connection handlers.
155   class launcher; ///< Move-only handle to launch worker coroutines. 155   class launcher; ///< Move-only handle to launch worker coroutines.
156   156  
157   private: 157   private:
158   struct waiter 158   struct waiter
159   { 159   {
160   waiter* next; 160   waiter* next;
161   std::coroutine_handle<> h; 161   std::coroutine_handle<> h;
162   detail::continuation_op cont_op; 162   detail::continuation_op cont_op;
163   worker_base* w; 163   worker_base* w;
164   }; 164   };
165   165  
166   struct impl; 166   struct impl;
167   167  
168   static impl* make_impl(capy::execution_context& ctx); 168   static impl* make_impl(capy::execution_context& ctx);
169   169  
170   impl* impl_; 170   impl* impl_;
171   capy::any_executor ex_; 171   capy::any_executor ex_;
172   waiter* waiters_ = nullptr; 172   waiter* waiters_ = nullptr;
173   worker_base* idle_head_ = nullptr; // Forward list: available workers 173   worker_base* idle_head_ = nullptr; // Forward list: available workers
174   worker_base* active_head_ = 174   worker_base* active_head_ =
175   nullptr; // Doubly linked: workers handling connections 175   nullptr; // Doubly linked: workers handling connections
176   worker_base* active_tail_ = nullptr; // Tail for O(1) push_back 176   worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
177   std::size_t active_accepts_ = 0; // Number of active do_accept coroutines 177   std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
178   std::shared_ptr<void> storage_; // Owns the worker container (type-erased) 178   std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
179   bool running_ = false; 179   bool running_ = false;
180   180  
181   // Idle list (forward/singly linked) - push front, pop front 181   // Idle list (forward/singly linked) - push front, pop front
HITCBC 182   45 void idle_push(worker_base* w) noexcept 182   45 void idle_push(worker_base* w) noexcept
183   { 183   {
HITCBC 184   45 w->next_ = idle_head_; 184   45 w->next_ = idle_head_;
HITCBC 185   45 idle_head_ = w; 185   45 idle_head_ = w;
HITCBC 186   45 } 186   45 }
187   187  
HITCBC 188   9 worker_base* idle_pop() noexcept 188   9 worker_base* idle_pop() noexcept
189   { 189   {
HITCBC 190   9 auto* w = idle_head_; 190   9 auto* w = idle_head_;
HITCBC 191   9 if (w) 191   9 if (w)
HITCBC 192   9 idle_head_ = w->next_; 192   9 idle_head_ = w->next_;
HITCBC 193   9 return w; 193   9 return w;
194   } 194   }
195   195  
HITCBC 196   9 bool idle_empty() const noexcept 196   9 bool idle_empty() const noexcept
197   { 197   {
HITCBC 198   9 return idle_head_ == nullptr; 198   9 return idle_head_ == nullptr;
199   } 199   }
200   200  
201   // Active list (doubly linked) - push back, remove anywhere 201   // Active list (doubly linked) - push back, remove anywhere
HITCBC 202   3 void active_push(worker_base* w) noexcept 202   3 void active_push(worker_base* w) noexcept
203   { 203   {
HITCBC 204   3 w->next_ = nullptr; 204   3 w->next_ = nullptr;
HITCBC 205   3 w->prev_ = active_tail_; 205   3 w->prev_ = active_tail_;
HITCBC 206   3 if (active_tail_) 206   3 if (active_tail_)
MISUBC 207   active_tail_->next_ = w; 207   active_tail_->next_ = w;
208   else 208   else
HITCBC 209   3 active_head_ = w; 209   3 active_head_ = w;
HITCBC 210   3 active_tail_ = w; 210   3 active_tail_ = w;
HITCBC 211   3 } 211   3 }
212   212  
HITCBC 213   9 void active_remove(worker_base* w) noexcept 213   9 void active_remove(worker_base* w) noexcept
214   { 214   {
215   // Skip if not in active list (e.g., after failed accept) 215   // Skip if not in active list (e.g., after failed accept)
HITCBC 216   9 if (w != active_head_ && w->prev_ == nullptr) 216   9 if (w != active_head_ && w->prev_ == nullptr)
HITCBC 217   6 return; 217   6 return;
HITCBC 218   3 if (w->prev_) 218   3 if (w->prev_)
MISUBC 219   w->prev_->next_ = w->next_; 219   w->prev_->next_ = w->next_;
220   else 220   else
HITCBC 221   3 active_head_ = w->next_; 221   3 active_head_ = w->next_;
HITCBC 222   3 if (w->next_) 222   3 if (w->next_)
MISUBC 223   w->next_->prev_ = w->prev_; 223   w->next_->prev_ = w->prev_;
224   else 224   else
HITCBC 225   3 active_tail_ = w->prev_; 225   3 active_tail_ = w->prev_;
HITCBC 226   3 w->prev_ = nullptr; // Mark as not in active list 226   3 w->prev_ = nullptr; // Mark as not in active list
227   } 227   }
228   228  
229   template<capy::Executor Ex> 229   template<capy::Executor Ex>
230   struct launch_wrapper 230   struct launch_wrapper
231   { 231   {
232   struct promise_type 232   struct promise_type
233   { 233   {
234   Ex ex; // Executor stored directly in frame (outlives child tasks) 234   Ex ex; // Executor stored directly in frame (outlives child tasks)
235   capy::io_env env_; 235   capy::io_env env_;
236   236  
237   // For regular coroutines: first arg is executor, second is stop token 237   // For regular coroutines: first arg is executor, second is stop token
238   template<class E, class S, class... Args> 238   template<class E, class S, class... Args>
239   requires capy::Executor<std::decay_t<E>> 239   requires capy::Executor<std::decay_t<E>>
240   promise_type(E e, S s, Args&&...) 240   promise_type(E e, S s, Args&&...)
241   : ex(std::move(e)) 241   : ex(std::move(e))
242   , env_{ 242   , env_{
243   capy::executor_ref(ex), std::move(s), 243   capy::executor_ref(ex), std::move(s),
244   capy::get_current_frame_allocator()} 244   capy::get_current_frame_allocator()}
245   { 245   {
246   } 246   }
247   247  
248   // For lambda coroutines: first arg is closure, second is executor, third is stop token 248   // For lambda coroutines: first arg is closure, second is executor, third is stop token
249   template<class Closure, class E, class S, class... Args> 249   template<class Closure, class E, class S, class... Args>
250   requires(!capy::Executor<std::decay_t<Closure>> && 250   requires(!capy::Executor<std::decay_t<Closure>> &&
251   capy::Executor<std::decay_t<E>>) 251   capy::Executor<std::decay_t<E>>)
HITCBC 252   3 promise_type(Closure&&, E e, S s, Args&&...) 252   3 promise_type(Closure&&, E e, S s, Args&&...)
HITCBC 253   3 : ex(std::move(e)) 253   3 : ex(std::move(e))
HITCBC 254   3 , env_{ 254   3 , env_{
HITCBC 255   3 capy::executor_ref(ex), std::move(s), 255   3 capy::executor_ref(ex), std::move(s),
HITCBC 256   3 capy::get_current_frame_allocator()} 256   3 capy::get_current_frame_allocator()}
257   { 257   {
HITCBC 258   3 } 258   3 }
259   259  
HITCBC 260   3 launch_wrapper get_return_object() noexcept 260   3 launch_wrapper get_return_object() noexcept
261   { 261   {
262   return { 262   return {
HITCBC 263   3 std::coroutine_handle<promise_type>::from_promise(*this)}; 263   3 std::coroutine_handle<promise_type>::from_promise(*this)};
264   } 264   }
HITCBC 265   3 std::suspend_always initial_suspend() noexcept 265   3 std::suspend_always initial_suspend() noexcept
266   { 266   {
HITCBC 267   3 return {}; 267   3 return {};
268   } 268   }
HITCBC 269   3 std::suspend_never final_suspend() noexcept 269   3 std::suspend_never final_suspend() noexcept
270   { 270   {
HITCBC 271   3 return {}; 271   3 return {};
272   } 272   }
HITCBC 273   3 void return_void() noexcept {} 273   3 void return_void() noexcept {}
MISUBC 274   void unhandled_exception() 274   void unhandled_exception()
275   { 275   {
MISUBC 276   std::terminate(); 276   std::terminate();
277   } 277   }
278   278  
279   // Inject io_env for IoAwaitable 279   // Inject io_env for IoAwaitable
280   template<capy::IoAwaitable Awaitable> 280   template<capy::IoAwaitable Awaitable>
HITCBC 281   6 auto await_transform(Awaitable&& a) 281   6 auto await_transform(Awaitable&& a)
282   { 282   {
283   using AwaitableT = std::decay_t<Awaitable>; 283   using AwaitableT = std::decay_t<Awaitable>;
284   struct adapter 284   struct adapter
285   { 285   {
286   AwaitableT aw; 286   AwaitableT aw;
287   capy::io_env const* env; 287   capy::io_env const* env;
288   288  
HITCBC 289   6 bool await_ready() 289   6 bool await_ready()
290   { 290   {
HITCBC 291   6 return aw.await_ready(); 291   6 return aw.await_ready();
292   } 292   }
HITCBC 293   6 decltype(auto) await_resume() 293   6 decltype(auto) await_resume()
294   { 294   {
HITCBC 295   6 return aw.await_resume(); 295   6 return aw.await_resume();
296   } 296   }
297   297  
HITCBC 298   6 auto await_suspend(std::coroutine_handle<promise_type> h) 298   6 auto await_suspend(std::coroutine_handle<promise_type> h)
299   { 299   {
HITCBC 300   6 return aw.await_suspend(h, env); 300   6 return aw.await_suspend(h, env);
301   } 301   }
302   }; 302   };
HITCBC 303   12 return adapter{std::forward<Awaitable>(a), &env_}; 303   12 return adapter{std::forward<Awaitable>(a), &env_};
HITCBC 304   6 } 304   6 }
305   }; 305   };
306   306  
307   std::coroutine_handle<promise_type> h; 307   std::coroutine_handle<promise_type> h;
308   308  
HITCBC 309   3 launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept 309   3 launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
HITCBC 310   3 : h(handle) 310   3 : h(handle)
311   { 311   {
HITCBC 312   3 } 312   3 }
313   313  
HITCBC 314   3 ~launch_wrapper() 314   3 ~launch_wrapper()
315   { 315   {
HITCBC 316   3 if (h) 316   3 if (h)
MISUBC 317   h.destroy(); 317   h.destroy();
HITCBC 318   3 } 318   3 }
319   319  
320   launch_wrapper(launch_wrapper&& o) noexcept 320   launch_wrapper(launch_wrapper&& o) noexcept
321   : h(std::exchange(o.h, nullptr)) 321   : h(std::exchange(o.h, nullptr))
322   { 322   {
323   } 323   }
324   324  
325   launch_wrapper(launch_wrapper const&) = delete; 325   launch_wrapper(launch_wrapper const&) = delete;
326   launch_wrapper& operator=(launch_wrapper const&) = delete; 326   launch_wrapper& operator=(launch_wrapper const&) = delete;
327   launch_wrapper& operator=(launch_wrapper&&) = delete; 327   launch_wrapper& operator=(launch_wrapper&&) = delete;
328   }; 328   };
329   329  
330   // Named functor to avoid incomplete lambda type in coroutine promise 330   // Named functor to avoid incomplete lambda type in coroutine promise
331   template<class Executor> 331   template<class Executor>
332   struct launch_coro 332   struct launch_coro
333   { 333   {
HITCBC 334   3 launch_wrapper<Executor> operator()( 334   3 launch_wrapper<Executor> operator()(
335   Executor, 335   Executor,
336   std::stop_token, 336   std::stop_token,
337   tcp_server* self, 337   tcp_server* self,
338   capy::task<void> t, 338   capy::task<void> t,
339   worker_base* wp) 339   worker_base* wp)
340   { 340   {
341   // Executor and stop token stored in promise via constructor 341   // Executor and stop token stored in promise via constructor
342   co_await std::move(t); 342   co_await std::move(t);
343   co_await self->push(*wp); // worker goes back to idle list 343   co_await self->push(*wp); // worker goes back to idle list
HITCBC 344   6 } 344   6 }
345   }; 345   };
346   346  
347   class push_awaitable 347   class push_awaitable
348   { 348   {
349   tcp_server& self_; 349   tcp_server& self_;
350   worker_base& w_; 350   worker_base& w_;
351   detail::continuation_op cont_op_; 351   detail::continuation_op cont_op_;
352   352  
353   public: 353   public:
HITCBC 354   9 push_awaitable(tcp_server& self, worker_base& w) noexcept 354   9 push_awaitable(tcp_server& self, worker_base& w) noexcept
HITCBC 355   9 : self_(self) 355   9 : self_(self)
HITCBC 356   9 , w_(w) 356   9 , w_(w)
357   { 357   {
HITCBC 358   9 } 358   9 }
359   359  
HITCBC 360   9 bool await_ready() const noexcept 360   9 bool await_ready() const noexcept
361   { 361   {
HITCBC 362   9 return false; 362   9 return false;
363   } 363   }
364   364  
365   std::coroutine_handle<> 365   std::coroutine_handle<>
HITCBC 366   9 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept 366   9 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
367   { 367   {
368   // Symmetric transfer to server's executor 368   // Symmetric transfer to server's executor
HITCBC 369   9 cont_op_.cont.h = h; 369   9 cont_op_.cont.h = h;
HITCBC 370   9 return self_.ex_.dispatch(cont_op_.cont); 370   9 return self_.ex_.dispatch(cont_op_.cont);
371   } 371   }
372   372  
HITCBC 373   9 void await_resume() noexcept 373   9 void await_resume() noexcept
374   { 374   {
375   // Running on server executor - safe to modify lists 375   // Running on server executor - safe to modify lists
376   // Remove from active (if present), then wake waiter or add to idle 376   // Remove from active (if present), then wake waiter or add to idle
HITCBC 377   9 self_.active_remove(&w_); 377   9 self_.active_remove(&w_);
HITCBC 378   9 if (self_.waiters_) 378   9 if (self_.waiters_)
379   { 379   {
MISUBC 380   auto* wait = self_.waiters_; 380   auto* wait = self_.waiters_;
MISUBC 381   self_.waiters_ = wait->next; 381   self_.waiters_ = wait->next;
MISUBC 382   wait->w = &w_; 382   wait->w = &w_;
MISUBC 383   wait->cont_op.cont.h = wait->h; 383   wait->cont_op.cont.h = wait->h;
MISUBC 384   self_.ex_.post(wait->cont_op.cont); 384   self_.ex_.post(wait->cont_op.cont);
385   } 385   }
386   else 386   else
387   { 387   {
HITCBC 388   9 self_.idle_push(&w_); 388   9 self_.idle_push(&w_);
389   } 389   }
HITCBC 390   9 } 390   9 }
391   }; 391   };
392   392  
393   class pop_awaitable 393   class pop_awaitable
394   { 394   {
395   tcp_server& self_; 395   tcp_server& self_;
396   waiter wait_; 396   waiter wait_;
397   397  
398   public: 398   public:
HITCBC 399   9 pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {} 399   9 pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
400   400  
HITCBC 401   9 bool await_ready() const noexcept 401   9 bool await_ready() const noexcept
402   { 402   {
HITCBC 403   9 return !self_.idle_empty(); 403   9 return !self_.idle_empty();
404   } 404   }
405   405  
406   bool 406   bool
MISUBC 407   await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept 407   await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
408   { 408   {
409   // Running on server executor (do_accept runs there) 409   // Running on server executor (do_accept runs there)
MISUBC 410   wait_.h = h; 410   wait_.h = h;
MISUBC 411   wait_.w = nullptr; 411   wait_.w = nullptr;
MISUBC 412   wait_.next = self_.waiters_; 412   wait_.next = self_.waiters_;
MISUBC 413   self_.waiters_ = &wait_; 413   self_.waiters_ = &wait_;
MISUBC 414   return true; 414   return true;
415   } 415   }
416   416  
HITCBC 417   9 worker_base& await_resume() noexcept 417   9 worker_base& await_resume() noexcept
418   { 418   {
419   // Running on server executor 419   // Running on server executor
HITCBC 420   9 if (wait_.w) 420   9 if (wait_.w)
MISUBC 421   return *wait_.w; // Woken by push_awaitable 421   return *wait_.w; // Woken by push_awaitable
HITCBC 422   9 return *self_.idle_pop(); 422   9 return *self_.idle_pop();
423   } 423   }
424   }; 424   };
425   425  
HITCBC 426   9 push_awaitable push(worker_base& w) 426   9 push_awaitable push(worker_base& w)
427   { 427   {
HITCBC 428   9 return push_awaitable{*this, w}; 428   9 return push_awaitable{*this, w};
429   } 429   }
430   430  
431   // Synchronous version for destructor/guard paths 431   // Synchronous version for destructor/guard paths
432   // Must be called from server executor context 432   // Must be called from server executor context
MISUBC 433   void push_sync(worker_base& w) noexcept 433   void push_sync(worker_base& w) noexcept
434   { 434   {
MISUBC 435   active_remove(&w); 435   active_remove(&w);
MISUBC 436   if (waiters_) 436   if (waiters_)
437   { 437   {
MISUBC 438   auto* wait = waiters_; 438   auto* wait = waiters_;
MISUBC 439   waiters_ = wait->next; 439   waiters_ = wait->next;
MISUBC 440   wait->w = &w; 440   wait->w = &w;
MISUBC 441   wait->cont_op.cont.h = wait->h; 441   wait->cont_op.cont.h = wait->h;
MISUBC 442   ex_.post(wait->cont_op.cont); 442   ex_.post(wait->cont_op.cont);
443   } 443   }
444   else 444   else
445   { 445   {
MISUBC 446   idle_push(&w); 446   idle_push(&w);
447   } 447   }
MISUBC 448   } 448   }
449   449  
HITCBC 450   9 pop_awaitable pop() 450   9 pop_awaitable pop()
451   { 451   {
HITCBC 452   9 return pop_awaitable{*this}; 452   9 return pop_awaitable{*this};
453   } 453   }
454   454  
455   capy::task<void> do_accept(tcp_acceptor& acc); 455   capy::task<void> do_accept(tcp_acceptor& acc);
456   456  
457   public: 457   public:
458   /** Abstract base class for connection handlers. 458   /** Abstract base class for connection handlers.
459   459  
460   Derive from this class to implement custom connection handling. 460   Derive from this class to implement custom connection handling.
461   Each worker owns a socket and is reused across multiple 461   Each worker owns a socket and is reused across multiple
462   connections to avoid per-connection allocation. 462   connections to avoid per-connection allocation.
463   463  
464   @see tcp_server, launcher 464   @see tcp_server, launcher
465   */ 465   */
466   class BOOST_COROSIO_DECL worker_base 466   class BOOST_COROSIO_DECL worker_base
467   { 467   {
468   // Ordered largest to smallest for optimal packing 468   // Ordered largest to smallest for optimal packing
469   std::stop_source stop_; // ~16 bytes 469   std::stop_source stop_; // ~16 bytes
470   worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists 470   worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
471   worker_base* prev_ = nullptr; // 8 bytes - used only by active list 471   worker_base* prev_ = nullptr; // 8 bytes - used only by active list
472   472  
473   friend class tcp_server; 473   friend class tcp_server;
474   474  
475   public: 475   public:
476   /// Construct a worker. 476   /// Construct a worker.
477   worker_base(); 477   worker_base();
478   478  
479   /// Destroy the worker. 479   /// Destroy the worker.
480   virtual ~worker_base(); 480   virtual ~worker_base();
481   481  
482   /** Handle an accepted connection. 482   /** Handle an accepted connection.
483   483  
484   Called when this worker is dispatched to handle a new 484   Called when this worker is dispatched to handle a new
485   connection. The implementation must invoke the launcher 485   connection. The implementation must invoke the launcher
486   exactly once to start the handling coroutine. 486   exactly once to start the handling coroutine.
487   487  
488   @param launch Handle to launch the connection coroutine. 488   @param launch Handle to launch the connection coroutine.
489   */ 489   */
490   virtual void run(launcher launch) = 0; 490   virtual void run(launcher launch) = 0;
491   491  
492   /// Return the socket used for connections. 492   /// Return the socket used for connections.
493   virtual corosio::tcp_socket& socket() = 0; 493   virtual corosio::tcp_socket& socket() = 0;
494   }; 494   };
495   495  
496   /** Move-only handle to launch a worker coroutine. 496   /** Move-only handle to launch a worker coroutine.
497   497  
498   Passed to @ref worker_base::run to start the connection-handling 498   Passed to @ref worker_base::run to start the connection-handling
499   coroutine. The launcher ensures the worker returns to the idle 499   coroutine. The launcher ensures the worker returns to the idle
500   pool when the coroutine completes or if launching fails. 500   pool when the coroutine completes or if launching fails.
501   501  
502   The launcher must be invoked exactly once via `operator()`. 502   The launcher must be invoked exactly once via `operator()`.
503   If destroyed without invoking, the worker is returned to the 503   If destroyed without invoking, the worker is returned to the
504   idle pool automatically. 504   idle pool automatically.
505   505  
506   @see worker_base::run 506   @see worker_base::run
507   */ 507   */
508   class BOOST_COROSIO_DECL launcher 508   class BOOST_COROSIO_DECL launcher
509   { 509   {
510   tcp_server* srv_; 510   tcp_server* srv_;
511   worker_base* w_; 511   worker_base* w_;
512   512  
513   friend class tcp_server; 513   friend class tcp_server;
514   514  
HITCBC 515   3 launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w) 515   3 launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
516   { 516   {
HITCBC 517   3 } 517   3 }
518   518  
519   public: 519   public:
520   /// Return the worker to the pool if not launched. 520   /// Return the worker to the pool if not launched.
HITCBC 521   3 ~launcher() 521   3 ~launcher()
522   { 522   {
HITCBC 523   3 if (w_) 523   3 if (w_)
MISUBC 524   srv_->push_sync(*w_); 524   srv_->push_sync(*w_);
HITCBC 525   3 } 525   3 }
526   526  
527   launcher(launcher&& o) noexcept 527   launcher(launcher&& o) noexcept
528   : srv_(o.srv_) 528   : srv_(o.srv_)
529   , w_(std::exchange(o.w_, nullptr)) 529   , w_(std::exchange(o.w_, nullptr))
530   { 530   {
531   } 531   }
532   launcher(launcher const&) = delete; 532   launcher(launcher const&) = delete;
533   launcher& operator=(launcher const&) = delete; 533   launcher& operator=(launcher const&) = delete;
534   launcher& operator=(launcher&&) = delete; 534   launcher& operator=(launcher&&) = delete;
535   535  
536   /** Launch the connection-handling coroutine. 536   /** Launch the connection-handling coroutine.
537   537  
538   Starts the given coroutine on the specified executor. When 538   Starts the given coroutine on the specified executor. When
539   the coroutine completes, the worker is automatically returned 539   the coroutine completes, the worker is automatically returned
540   to the idle pool. 540   to the idle pool.
541   541  
542   @param ex The executor to run the coroutine on. 542   @param ex The executor to run the coroutine on.
543   @param task The coroutine to execute. 543   @param task The coroutine to execute.
544   544  
545   @throws std::logic_error If this launcher was already invoked. 545   @throws std::logic_error If this launcher was already invoked.
546   */ 546   */
547   template<class Executor> 547   template<class Executor>
HITCBC 548   3 void operator()(Executor const& ex, capy::task<void> task) 548   3 void operator()(Executor const& ex, capy::task<void> task)
549   { 549   {
HITCBC 550   3 if (!w_) 550   3 if (!w_)
MISUBC 551   detail::throw_logic_error(); // launcher already invoked 551   detail::throw_logic_error(); // launcher already invoked
552   552  
HITCBC 553   3 auto* w = std::exchange(w_, nullptr); 553   3 auto* w = std::exchange(w_, nullptr);
554   554  
555   // Worker is being dispatched - add to active list 555   // Worker is being dispatched - add to active list
HITCBC 556   3 srv_->active_push(w); 556   3 srv_->active_push(w);
557   557  
558   // Return worker to pool if coroutine setup throws 558   // Return worker to pool if coroutine setup throws
559   struct guard_t 559   struct guard_t
560   { 560   {
561   tcp_server* srv; 561   tcp_server* srv;
562   worker_base* w; 562   worker_base* w;
HITCBC 563   3 ~guard_t() 563   3 ~guard_t()
564   { 564   {
HITCBC 565   3 if (w) 565   3 if (w)
MISUBC 566   srv->push_sync(*w); 566   srv->push_sync(*w);
HITCBC 567   3 } 567   3 }
HITCBC 568   3 } guard{srv_, w}; 568   3 } guard{srv_, w};
569   569  
570   // Reset worker's stop source for this connection 570   // Reset worker's stop source for this connection
HITCBC 571   3 w->stop_ = {}; 571   3 w->stop_ = {};
HITCBC 572   3 auto st = w->stop_.get_token(); 572   3 auto st = w->stop_.get_token();
573   573  
HITCBC 574   3 auto wrapper = 574   3 auto wrapper =
HITCBC 575   3 launch_coro<Executor>{}(ex, st, srv_, std::move(task), w); 575   3 launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
576   576  
577   // Executor and stop token stored in promise via constructor 577   // Executor and stop token stored in promise via constructor
HITCBC 578   3 ex.post(std::exchange(wrapper.h, nullptr)); // Release before post 578   3 ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
HITCBC 579   3 guard.w = nullptr; // Success - dismiss guard 579   3 guard.w = nullptr; // Success - dismiss guard
HITCBC 580   3 } 580   3 }
581   }; 581   };
582   582  
583   /** Construct a TCP server. 583   /** Construct a TCP server.
584   584  
585   @tparam Ctx Execution context type satisfying ExecutionContext. 585   @tparam Ctx Execution context type satisfying ExecutionContext.
586   @tparam Ex Executor type satisfying Executor. 586   @tparam Ex Executor type satisfying Executor.
587   587  
588   @param ctx The execution context for socket operations. 588   @param ctx The execution context for socket operations.
589   @param ex The executor for dispatching coroutines. 589   @param ex The executor for dispatching coroutines.
590   590  
591   @par Example 591   @par Example
592   @code 592   @code
593   tcp_server srv(ctx, ctx.get_executor()); 593   tcp_server srv(ctx, ctx.get_executor());
594   srv.set_workers(make_workers(ctx, 100)); 594   srv.set_workers(make_workers(ctx, 100));
595   srv.bind(endpoint{...}); 595   srv.bind(endpoint{...});
596   srv.start(); 596   srv.start();
597   @endcode 597   @endcode
598   */ 598   */
599   template<capy::ExecutionContext Ctx, capy::Executor Ex> 599   template<capy::ExecutionContext Ctx, capy::Executor Ex>
HITCBC 600   9 tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx)) 600   9 tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
HITCBC 601   9 , ex_(std::move(ex)) 601   9 , ex_(std::move(ex))
602   { 602   {
HITCBC 603   9 } 603   9 }
604   604  
605   public: 605   public:
606   /// Destroy the server, stopping all accept loops. 606   /// Destroy the server, stopping all accept loops.
607   ~tcp_server(); 607   ~tcp_server();
608   608  
609   tcp_server(tcp_server const&) = delete; 609   tcp_server(tcp_server const&) = delete;
610   tcp_server& operator=(tcp_server const&) = delete; 610   tcp_server& operator=(tcp_server const&) = delete;
611   611  
612   /** Move construct from another server. 612   /** Move construct from another server.
613   613  
614   @param o The source server. After the move, @p o is 614   @param o The source server. After the move, @p o is
615   in a valid but unspecified state. 615   in a valid but unspecified state.
616   */ 616   */
617   tcp_server(tcp_server&& o) noexcept; 617   tcp_server(tcp_server&& o) noexcept;
618   618  
619   /** Move assign from another server. 619   /** Move assign from another server.
620   620  
621   @param o The source server. After the move, @p o is 621   @param o The source server. After the move, @p o is
622   in a valid but unspecified state. 622   in a valid but unspecified state.
623   623  
624   @return `*this`. 624   @return `*this`.
625   */ 625   */
626   tcp_server& operator=(tcp_server&& o) noexcept; 626   tcp_server& operator=(tcp_server&& o) noexcept;
627   627  
628   /** Bind to a local endpoint. 628   /** Bind to a local endpoint.
629   629  
630   Creates an acceptor listening on the specified endpoint. 630   Creates an acceptor listening on the specified endpoint.
631   Multiple endpoints can be bound by calling this method 631   Multiple endpoints can be bound by calling this method
632   multiple times before @ref start. 632   multiple times before @ref start.
633   633  
634   @param ep The local endpoint to bind to. 634   @param ep The local endpoint to bind to.
635   635  
636   @return The error code if binding fails. 636   @return The error code if binding fails.
637   */ 637   */
638   std::error_code bind(endpoint ep); 638   std::error_code bind(endpoint ep);
639   639  
640   /** Set the worker pool. 640   /** Set the worker pool.
641   641  
642   Replaces any existing workers with the given range. Any 642   Replaces any existing workers with the given range. Any
643   previous workers are released and the idle/active lists 643   previous workers are released and the idle/active lists
644   are cleared before populating with new workers. 644   are cleared before populating with new workers.
645   645  
646   @tparam Range Forward range of pointer-like objects to worker_base. 646   @tparam Range Forward range of pointer-like objects to worker_base.
647   647  
648   @param workers Range of workers to manage. Each element must 648   @param workers Range of workers to manage. Each element must
649   support `std::to_address()` yielding `worker_base*`. 649   support `std::to_address()` yielding `worker_base*`.
650   650  
651   @par Example 651   @par Example
652   @code 652   @code
653   std::vector<std::unique_ptr<my_worker>> workers; 653   std::vector<std::unique_ptr<my_worker>> workers;
654   for(int i = 0; i < 100; ++i) 654   for(int i = 0; i < 100; ++i)
655   workers.push_back(std::make_unique<my_worker>(ctx)); 655   workers.push_back(std::make_unique<my_worker>(ctx));
656   srv.set_workers(std::move(workers)); 656   srv.set_workers(std::move(workers));
657   @endcode 657   @endcode
658   */ 658   */
659   template<std::ranges::forward_range Range> 659   template<std::ranges::forward_range Range>
660   requires std::convertible_to< 660   requires std::convertible_to<
661   decltype(std::to_address( 661   decltype(std::to_address(
662   std::declval<std::ranges::range_value_t<Range>&>())), 662   std::declval<std::ranges::range_value_t<Range>&>())),
663   worker_base*> 663   worker_base*>
HITCBC 664   9 void set_workers(Range&& workers) 664   9 void set_workers(Range&& workers)
665   { 665   {
666   // Clear existing state 666   // Clear existing state
HITCBC 667   9 storage_.reset(); 667   9 storage_.reset();
HITCBC 668   9 idle_head_ = nullptr; 668   9 idle_head_ = nullptr;
HITCBC 669   9 active_head_ = nullptr; 669   9 active_head_ = nullptr;
HITCBC 670   9 active_tail_ = nullptr; 670   9 active_tail_ = nullptr;
671   671  
672   // Take ownership and populate idle list 672   // Take ownership and populate idle list
673   using StorageType = std::decay_t<Range>; 673   using StorageType = std::decay_t<Range>;
HITCBC 674   9 auto* p = new StorageType(std::forward<Range>(workers)); 674   9 auto* p = new StorageType(std::forward<Range>(workers));
HITCBC 675   9 storage_ = std::shared_ptr<void>( 675   9 storage_ = std::shared_ptr<void>(
HITCBC 676   9 p, [](void* ptr) { delete static_cast<StorageType*>(ptr); }); 676   9 p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
HITCBC 677   45 for (auto&& elem : *static_cast<StorageType*>(p)) 677   45 for (auto&& elem : *static_cast<StorageType*>(p))
HITCBC 678   36 idle_push(std::to_address(elem)); 678   36 idle_push(std::to_address(elem));
HITCBC 679   9 } 679   9 }
680   680  
681   /** Start accepting connections. 681   /** Start accepting connections.
682   682  
683   Launches accept loops for all bound endpoints. Incoming 683   Launches accept loops for all bound endpoints. Incoming
684   connections are dispatched to idle workers from the pool. 684   connections are dispatched to idle workers from the pool.
685   685  
686   Calling `start()` on an already-running server has no effect. 686   Calling `start()` on an already-running server has no effect.
687   687  
688   @par Preconditions 688   @par Preconditions
689   - At least one endpoint bound via @ref bind. 689   - At least one endpoint bound via @ref bind.
690   - Workers provided to the constructor. 690   - Workers provided to the constructor.
691   - If restarting, @ref join must have completed first. 691   - If restarting, @ref join must have completed first.
692   692  
693   @par Effects 693   @par Effects
694   Creates one accept coroutine per bound endpoint. Each coroutine 694   Creates one accept coroutine per bound endpoint. Each coroutine
695   runs on the server's executor, waiting for connections and 695   runs on the server's executor, waiting for connections and
696   dispatching them to idle workers. 696   dispatching them to idle workers.
697   697  
698   @par Restart Sequence 698   @par Restart Sequence
699   To restart after stopping, complete the full shutdown cycle: 699   To restart after stopping, complete the full shutdown cycle:
700   @code 700   @code
701   srv.start(); 701   srv.start();
702   ioc.run_for( 1s ); 702   ioc.run_for( 1s );
703   srv.stop(); // 1. Signal shutdown 703   srv.stop(); // 1. Signal shutdown
704   ioc.run(); // 2. Drain remaining completions 704   ioc.run(); // 2. Drain remaining completions
705   srv.join(); // 3. Wait for accept loops 705   srv.join(); // 3. Wait for accept loops
706   706  
707   // Now safe to restart 707   // Now safe to restart
708   srv.start(); 708   srv.start();
709   ioc.run(); 709   ioc.run();
710   @endcode 710   @endcode
711   711  
712   @par Thread Safety 712   @par Thread Safety
713   Not thread safe. 713   Not thread safe.
714   714  
715   @throws std::logic_error If a previous session has not been 715   @throws std::logic_error If a previous session has not been
716   joined (accept loops still active). 716   joined (accept loops still active).
717   */ 717   */
718   void start(); 718   void start();
719   719  
720   /** Return the local endpoint for the i-th bound port. 720   /** Return the local endpoint for the i-th bound port.
721   721  
722   @param index Zero-based index into the list of bound ports. 722   @param index Zero-based index into the list of bound ports.
723   723  
724   @return The local endpoint, or a default-constructed endpoint 724   @return The local endpoint, or a default-constructed endpoint
725   if @p index is out of range or the acceptor is not open. 725   if @p index is out of range or the acceptor is not open.
726   */ 726   */
727   endpoint local_endpoint(std::size_t index = 0) const noexcept; 727   endpoint local_endpoint(std::size_t index = 0) const noexcept;
728   728  
729   /** Stop accepting connections. 729   /** Stop accepting connections.
730   730  
731   Signals all listening ports to stop accepting new connections 731   Signals all listening ports to stop accepting new connections
732   and requests cancellation of active workers via their stop tokens. 732   and requests cancellation of active workers via their stop tokens.
733   733  
734   This function returns immediately; it does not wait for workers 734   This function returns immediately; it does not wait for workers
735   to finish. Pending I/O operations complete asynchronously. 735   to finish. Pending I/O operations complete asynchronously.
736   736  
737   Calling `stop()` on a non-running server has no effect. 737   Calling `stop()` on a non-running server has no effect.
738   738  
739   @par Effects 739   @par Effects
740   - Closes all acceptors (pending accepts complete with error). 740   - Closes all acceptors (pending accepts complete with error).
741   - Requests stop on each active worker's stop token. 741   - Requests stop on each active worker's stop token.
742   - Workers observing their stop token should exit promptly. 742   - Workers observing their stop token should exit promptly.
743   743  
744   @par Postconditions 744   @par Postconditions
745   No new connections will be accepted. Active workers continue 745   No new connections will be accepted. Active workers continue
746   until they observe their stop token or complete naturally. 746   until they observe their stop token or complete naturally.
747   747  
748   @par What Happens Next 748   @par What Happens Next
749   After calling `stop()`: 749   After calling `stop()`:
750   1. Let `ioc.run()` return (drains pending completions). 750   1. Let `ioc.run()` return (drains pending completions).
751   2. Call @ref join to wait for accept loops to finish. 751   2. Call @ref join to wait for accept loops to finish.
752   3. Only then is it safe to restart or destroy the server. 752   3. Only then is it safe to restart or destroy the server.
753   753  
754   @par Thread Safety 754   @par Thread Safety
755   Not thread safe. 755   Not thread safe.
756   756  
757   @see join, start 757   @see join, start
758   */ 758   */
759   void stop(); 759   void stop();
760   760  
761   /** Block until all accept loops complete. 761   /** Block until all accept loops complete.
762   762  
763   Blocks the calling thread until all accept coroutines launched 763   Blocks the calling thread until all accept coroutines launched
764   by @ref start have finished executing. This synchronizes the 764   by @ref start have finished executing. This synchronizes the
765   shutdown sequence, ensuring the server is fully stopped before 765   shutdown sequence, ensuring the server is fully stopped before
766   restarting or destroying it. 766   restarting or destroying it.
767   767  
768   @par Preconditions 768   @par Preconditions
769   @ref stop has been called and `ioc.run()` has returned. 769   @ref stop has been called and `ioc.run()` has returned.
770   770  
771   @par Postconditions 771   @par Postconditions
772   All accept loops have completed. The server is in the stopped 772   All accept loops have completed. The server is in the stopped
773   state and may be restarted via @ref start. 773   state and may be restarted via @ref start.
774   774  
775   @par Example (Correct Usage) 775   @par Example (Correct Usage)
776   @code 776   @code
777   // main thread 777   // main thread
778   srv.start(); 778   srv.start();
779   ioc.run(); // Blocks until work completes 779   ioc.run(); // Blocks until work completes
780   srv.join(); // Safe: called after ioc.run() returns 780   srv.join(); // Safe: called after ioc.run() returns
781   @endcode 781   @endcode
782   782  
783   @par WARNING: Deadlock Scenarios 783   @par WARNING: Deadlock Scenarios
784   Calling `join()` from the wrong context causes deadlock: 784   Calling `join()` from the wrong context causes deadlock:
785   785  
786   @code 786   @code
787   // WRONG: calling join() from inside a worker coroutine 787   // WRONG: calling join() from inside a worker coroutine
788   void run( launcher launch ) override 788   void run( launcher launch ) override
789   { 789   {
790   launch( ex, [this]() -> capy::task<> 790   launch( ex, [this]() -> capy::task<>
791   { 791   {
792   srv_.join(); // DEADLOCK: blocks the executor 792   srv_.join(); // DEADLOCK: blocks the executor
793   co_return; 793   co_return;
794   }()); 794   }());
795   } 795   }
796   796  
797   // WRONG: calling join() while ioc.run() is still active 797   // WRONG: calling join() while ioc.run() is still active
798   std::thread t( [&]{ ioc.run(); } ); 798   std::thread t( [&]{ ioc.run(); } );
799   srv.stop(); 799   srv.stop();
800   srv.join(); // DEADLOCK: ioc.run() still running in thread t 800   srv.join(); // DEADLOCK: ioc.run() still running in thread t
801   @endcode 801   @endcode
802   802  
803   @par Thread Safety 803   @par Thread Safety
804   May be called from any thread, but will deadlock if called 804   May be called from any thread, but will deadlock if called
805   from within the io_context event loop or from a worker coroutine. 805   from within the io_context event loop or from a worker coroutine.
806   806  
807   @see stop, start 807   @see stop, start
808   */ 808   */
809   void join(); 809   void join();
810   810  
811   private: 811   private:
812   capy::task<> do_stop(); 812   capy::task<> do_stop();
813   }; 813   };
814   814  
815   #ifdef _MSC_VER 815   #ifdef _MSC_VER
816   #pragma warning(pop) 816   #pragma warning(pop)
817   #endif 817   #endif
818   818  
819   } // namespace boost::corosio 819   } // namespace boost::corosio
820   820  
821   #endif 821   #endif