89.13% Lines (41/46) 100.00% Functions (7/7)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Steve Gerbino 2   // Copyright (c) 2026 Steve Gerbino
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 10   #ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 11   #define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12   12  
13   #include <boost/corosio/detail/config.hpp> 13   #include <boost/corosio/detail/config.hpp>
14   #include <boost/corosio/detail/intrusive.hpp> 14   #include <boost/corosio/detail/intrusive.hpp>
15   #include <boost/capy/ex/execution_context.hpp> 15   #include <boost/capy/ex/execution_context.hpp>
16   16  
17   #include <condition_variable> 17   #include <condition_variable>
18   #include <mutex> 18   #include <mutex>
19   #include <stdexcept> 19   #include <stdexcept>
20   #include <thread> 20   #include <thread>
21   #include <vector> 21   #include <vector>
22   22  
23   namespace boost::corosio::detail { 23   namespace boost::corosio::detail {
24   24  
25   /** Base class for thread pool work items. 25   /** Base class for thread pool work items.
26   26  
27   Derive from this to create work that can be posted to a 27   Derive from this to create work that can be posted to a
28   @ref thread_pool. Uses static function pointer dispatch, 28   @ref thread_pool. Uses static function pointer dispatch,
29   consistent with the IOCP `op` pattern. 29   consistent with the IOCP `op` pattern.
30   30  
31   @par Example 31   @par Example
32   @code 32   @code
33   struct my_work : pool_work_item 33   struct my_work : pool_work_item
34   { 34   {
35   int* result; 35   int* result;
36   static void execute( pool_work_item* w ) noexcept 36   static void execute( pool_work_item* w ) noexcept
37   { 37   {
38   auto* self = static_cast<my_work*>( w ); 38   auto* self = static_cast<my_work*>( w );
39   *self->result = 42; 39   *self->result = 42;
40   } 40   }
41   }; 41   };
42   42  
43   my_work w; 43   my_work w;
44   w.func_ = &my_work::execute; 44   w.func_ = &my_work::execute;
45   w.result = &r; 45   w.result = &r;
46   pool.post( &w ); 46   pool.post( &w );
47   @endcode 47   @endcode
48   */ 48   */
49   struct pool_work_item : intrusive_queue<pool_work_item>::node 49   struct pool_work_item : intrusive_queue<pool_work_item>::node
50   { 50   {
51   /// Static dispatch function signature. 51   /// Static dispatch function signature.
52   using func_type = void (*)(pool_work_item*) noexcept; 52   using func_type = void (*)(pool_work_item*) noexcept;
53   53  
54   /// Completion handler invoked by the worker thread. 54   /// Completion handler invoked by the worker thread.
55   func_type func_ = nullptr; 55   func_type func_ = nullptr;
56   }; 56   };
57   57  
58   /** Shared thread pool for dispatching blocking operations. 58   /** Shared thread pool for dispatching blocking operations.
59   59  
60   Provides a fixed pool of reusable worker threads for operations 60   Provides a fixed pool of reusable worker threads for operations
61   that cannot be integrated with async I/O (e.g. blocking DNS 61   that cannot be integrated with async I/O (e.g. blocking DNS
62   calls). Registered as an `execution_context::service` so it 62   calls). Registered as an `execution_context::service` so it
63   is a singleton per io_context. 63   is a singleton per io_context.
64   64  
65   Threads are created eagerly in the constructor. The default 65   Threads are created eagerly in the constructor. The default
66   thread count is 1. 66   thread count is 1.
67   67  
68   @par Thread Safety 68   @par Thread Safety
69   All public member functions are thread-safe. 69   All public member functions are thread-safe.
70   70  
71   @par Shutdown 71   @par Shutdown
72   Sets a shutdown flag, notifies all threads, and joins them. 72   Sets a shutdown flag, notifies all threads, and joins them.
73   In-flight blocking calls complete naturally before the thread 73   In-flight blocking calls complete naturally before the thread
74   exits. 74   exits.
75   */ 75   */
76   class thread_pool final : public capy::execution_context::service 76   class thread_pool final : public capy::execution_context::service
77   { 77   {
78   std::mutex mutex_; 78   std::mutex mutex_;
79   std::condition_variable cv_; 79   std::condition_variable cv_;
80   intrusive_queue<pool_work_item> work_queue_; 80   intrusive_queue<pool_work_item> work_queue_;
81   std::vector<std::thread> threads_; 81   std::vector<std::thread> threads_;
82   bool shutdown_ = false; 82   bool shutdown_ = false;
83   83  
84   void worker_loop(); 84   void worker_loop();
85   85  
86   public: 86   public:
87   using key_type = thread_pool; 87   using key_type = thread_pool;
88   88  
89   /** Construct the thread pool service. 89   /** Construct the thread pool service.
90   90  
91   Eagerly creates all worker threads. 91   Eagerly creates all worker threads.
92   92  
93   @par Exception Safety 93   @par Exception Safety
94   Strong guarantee. If thread creation fails, all 94   Strong guarantee. If thread creation fails, all
95   already-created threads are shut down and joined 95   already-created threads are shut down and joined
96   before the exception propagates. 96   before the exception propagates.
97   97  
98   @param ctx Reference to the owning execution_context. 98   @param ctx Reference to the owning execution_context.
99   @param num_threads Number of worker threads. Must be 99   @param num_threads Number of worker threads. Must be
100   at least 1. 100   at least 1.
101   101  
102   @throws std::logic_error If `num_threads` is 0. 102   @throws std::logic_error If `num_threads` is 0.
103   */ 103   */
HITCBC 104   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1) 104   607 explicit thread_pool(capy::execution_context& ctx, unsigned num_threads = 1)
HITCBC 105   607 { 105   607 {
106   (void)ctx; 106   (void)ctx;
HITCBC 107   607 if (!num_threads) 107   607 if (!num_threads)
HITCBC 108   1 throw std::logic_error("thread_pool requires at least 1 thread"); 108   1 throw std::logic_error("thread_pool requires at least 1 thread");
HITCBC 109   606 threads_.reserve(num_threads); 109   606 threads_.reserve(num_threads);
110   try 110   try
111   { 111   {
HITCBC 112   1215 for (unsigned i = 0; i < num_threads; ++i) 112   1215 for (unsigned i = 0; i < num_threads; ++i)
HITCBC 113   1218 threads_.emplace_back([this] { worker_loop(); }); 113   1218 threads_.emplace_back([this] { worker_loop(); });
114   } 114   }
MISUBC 115   catch (...) 115   catch (...)
116   { 116   {
MISUBC 117   shutdown(); 117   shutdown();
MISUBC 118   throw; 118   throw;
MISUBC 119   } 119   }
HITCBC 120   609 } 120   609 }
121   121  
HITCBC 122   1211 ~thread_pool() override = default; 122   1211 ~thread_pool() override = default;
123   123  
124   thread_pool(thread_pool const&) = delete; 124   thread_pool(thread_pool const&) = delete;
125   thread_pool& operator=(thread_pool const&) = delete; 125   thread_pool& operator=(thread_pool const&) = delete;
126   126  
127   /** Enqueue a work item for execution on the thread pool. 127   /** Enqueue a work item for execution on the thread pool.
128   128  
129   Zero-allocation: the caller owns the work item's storage. 129   Zero-allocation: the caller owns the work item's storage.
130   130  
131   @param w The work item to execute. Must remain valid until 131   @param w The work item to execute. Must remain valid until
132   its `func_` has been called. 132   its `func_` has been called.
133   133  
134   @return `true` if the item was enqueued, `false` if the 134   @return `true` if the item was enqueued, `false` if the
135   pool has already shut down. 135   pool has already shut down.
136   */ 136   */
137   bool post(pool_work_item* w) noexcept; 137   bool post(pool_work_item* w) noexcept;
138   138  
139   /** Shut down the thread pool. 139   /** Shut down the thread pool.
140   140  
141   Signals all threads to exit after draining any 141   Signals all threads to exit after draining any
142   remaining queued work, then joins them. 142   remaining queued work, then joins them.
143   */ 143   */
144   void shutdown() override; 144   void shutdown() override;
145   }; 145   };
146   146  
147   inline void 147   inline void
HITCBC 148   609 thread_pool::worker_loop() 148   609 thread_pool::worker_loop()
149   { 149   {
150   for (;;) 150   for (;;)
151   { 151   {
152   pool_work_item* w; 152   pool_work_item* w;
153   { 153   {
HITCBC 154   788 std::unique_lock<std::mutex> lock(mutex_); 154   788 std::unique_lock<std::mutex> lock(mutex_);
HITCBC 155   788 cv_.wait( 155   788 cv_.wait(
HITCBC 156   1280 lock, [this] { return shutdown_ || !work_queue_.empty(); }); 156   1314 lock, [this] { return shutdown_ || !work_queue_.empty(); });
157   157  
HITCBC 158   788 w = work_queue_.pop(); 158   788 w = work_queue_.pop();
HITCBC 159   788 if (!w) 159   788 if (!w)
160   { 160   {
HITCBC 161   609 if (shutdown_) 161   609 if (shutdown_)
HITCBC 162   1218 return; 162   1218 return;
MISUBC 163   continue; 163   continue;
164   } 164   }
HITCBC 165   788 } 165   788 }
HITCBC 166   179 w->func_(w); 166   179 w->func_(w);
HITCBC 167   179 } 167   179 }
168   } 168   }
169   169  
170   inline bool 170   inline bool
HITCBC 171   180 thread_pool::post(pool_work_item* w) noexcept 171   180 thread_pool::post(pool_work_item* w) noexcept
172   { 172   {
173   { 173   {
HITCBC 174   180 std::lock_guard<std::mutex> lock(mutex_); 174   180 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 175   180 if (shutdown_) 175   180 if (shutdown_)
HITCBC 176   1 return false; 176   1 return false;
HITCBC 177   179 work_queue_.push(w); 177   179 work_queue_.push(w);
HITCBC 178   180 } 178   180 }
HITCBC 179   179 cv_.notify_one(); 179   179 cv_.notify_one();
HITCBC 180   179 return true; 180   179 return true;
181   } 181   }
182   182  
183   inline void 183   inline void
HITCBC 184   610 thread_pool::shutdown() 184   610 thread_pool::shutdown()
185   { 185   {
186   { 186   {
HITCBC 187   610 std::lock_guard<std::mutex> lock(mutex_); 187   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 188   610 shutdown_ = true; 188   610 shutdown_ = true;
HITCBC 189   610 } 189   610 }
HITCBC 190   610 cv_.notify_all(); 190   610 cv_.notify_all();
191   191  
HITCBC 192   1219 for (auto& t : threads_) 192   1219 for (auto& t : threads_)
193   { 193   {
HITCBC 194   609 if (t.joinable()) 194   609 if (t.joinable())
HITCBC 195   609 t.join(); 195   609 t.join();
196   } 196   }
HITCBC 197   610 threads_.clear(); 197   610 threads_.clear();
198   198  
199   { 199   {
HITCBC 200   610 std::lock_guard<std::mutex> lock(mutex_); 200   610 std::lock_guard<std::mutex> lock(mutex_);
HITCBC 201   610 while (work_queue_.pop()) 201   610 while (work_queue_.pop())
202   ; 202   ;
HITCBC 203   610 } 203   610 }
HITCBC 204   610 } 204   610 }
205   205  
206   } // namespace boost::corosio::detail 206   } // namespace boost::corosio::detail
207   207  
208   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP 208   #endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP