91.76% Lines (345/376) 97.78% Functions (44/45)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // Copyright (c) 2026 Steve Gerbino 3   // Copyright (c) 2026 Steve Gerbino
4   // 4   //
5   // Distributed under the Boost Software License, Version 1.0. (See accompanying 5   // Distributed under the Boost Software License, Version 1.0. (See accompanying
6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7   // 7   //
8   // Official repository: https://github.com/cppalliance/corosio 8   // Official repository: https://github.com/cppalliance/corosio
9   // 9   //
10   10  
11   #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP 11   #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12   #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP 12   #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13   13  
14   #include <boost/corosio/timer.hpp> 14   #include <boost/corosio/timer.hpp>
15   #include <boost/corosio/io_context.hpp> 15   #include <boost/corosio/io_context.hpp>
16   #include <boost/corosio/detail/scheduler_op.hpp> 16   #include <boost/corosio/detail/scheduler_op.hpp>
17   #include <boost/corosio/detail/intrusive.hpp> 17   #include <boost/corosio/detail/intrusive.hpp>
18   #include <boost/corosio/detail/thread_local_ptr.hpp> 18   #include <boost/corosio/detail/thread_local_ptr.hpp>
19   #include <boost/capy/error.hpp> 19   #include <boost/capy/error.hpp>
20   #include <boost/capy/ex/execution_context.hpp> 20   #include <boost/capy/ex/execution_context.hpp>
21   #include <boost/capy/ex/executor_ref.hpp> 21   #include <boost/capy/ex/executor_ref.hpp>
22   #include <system_error> 22   #include <system_error>
23   23  
24   #include <atomic> 24   #include <atomic>
25   #include <chrono> 25   #include <chrono>
26   #include <coroutine> 26   #include <coroutine>
27   #include <cstddef> 27   #include <cstddef>
28   #include <limits> 28   #include <limits>
29   #include <mutex> 29   #include <mutex>
30   #include <optional> 30   #include <optional>
31   #include <stop_token> 31   #include <stop_token>
32   #include <utility> 32   #include <utility>
33   #include <vector> 33   #include <vector>
34   34  
35   namespace boost::corosio::detail { 35   namespace boost::corosio::detail {
36   36  
37   struct scheduler; 37   struct scheduler;
38   38  
39   /* 39   /*
40   Timer Service 40   Timer Service
41   ============= 41   =============
42   42  
43   Data Structures 43   Data Structures
44   --------------- 44   ---------------
45   waiter_node holds per-waiter state: coroutine handle, executor, 45   waiter_node holds per-waiter state: coroutine handle, executor,
46   error output, stop_token, embedded completion_op. Each concurrent 46   error output, stop_token, embedded completion_op. Each concurrent
47   co_await t.wait() allocates one waiter_node. 47   co_await t.wait() allocates one waiter_node.
48   48  
49   timer_service::implementation holds per-timer state: expiry, 49   timer_service::implementation holds per-timer state: expiry,
50   heap index, and an intrusive_list of waiter_nodes. Multiple 50   heap index, and an intrusive_list of waiter_nodes. Multiple
51   coroutines can wait on the same timer simultaneously. 51   coroutines can wait on the same timer simultaneously.
52   52  
53   timer_service owns a min-heap of active timers, a free list 53   timer_service owns a min-heap of active timers, a free list
54   of recycled impls, and a free list of recycled waiter_nodes. The 54   of recycled impls, and a free list of recycled waiter_nodes. The
55   heap is ordered by expiry time; the scheduler queries 55   heap is ordered by expiry time; the scheduler queries
56   nearest_expiry() to set the epoll/timerfd timeout. 56   nearest_expiry() to set the epoll/timerfd timeout.
57   57  
58   Optimization Strategy 58   Optimization Strategy
59   --------------------- 59   ---------------------
60   1. Deferred heap insertion — expires_after() stores the expiry 60   1. Deferred heap insertion — expires_after() stores the expiry
61   but does not insert into the heap. Insertion happens in wait(). 61   but does not insert into the heap. Insertion happens in wait().
62   2. Thread-local impl cache — single-slot per-thread cache. 62   2. Thread-local impl cache — single-slot per-thread cache.
63   3. Embedded completion_op — eliminates heap allocation per fire/cancel. 63   3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64   4. Cached nearest expiry — atomic avoids mutex in nearest_expiry(). 64   4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65   5. might_have_pending_waits_ flag — skips lock when no wait issued. 65   5. might_have_pending_waits_ flag — skips lock when no wait issued.
66   6. Thread-local waiter cache — single-slot per-thread cache. 66   6. Thread-local waiter cache — single-slot per-thread cache.
67   67  
68   Concurrency 68   Concurrency
69   ----------- 69   -----------
70   stop_token callbacks can fire from any thread. The impl_ 70   stop_token callbacks can fire from any thread. The impl_
71   pointer on waiter_node is used as a "still in list" marker. 71   pointer on waiter_node is used as a "still in list" marker.
72   */ 72   */
73   73  
74   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node; 74   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75   75  
76   inline void timer_service_invalidate_cache() noexcept; 76   inline void timer_service_invalidate_cache() noexcept;
77   77  
78   // timer_service class body — member function definitions are 78   // timer_service class body — member function definitions are
79   // out-of-class (after implementation and waiter_node are complete) 79   // out-of-class (after implementation and waiter_node are complete)
80   class BOOST_COROSIO_DECL timer_service final 80   class BOOST_COROSIO_DECL timer_service final
81   : public capy::execution_context::service 81   : public capy::execution_context::service
82   , public io_object::io_service 82   , public io_object::io_service
83   { 83   {
84   public: 84   public:
85   using clock_type = std::chrono::steady_clock; 85   using clock_type = std::chrono::steady_clock;
86   using time_point = clock_type::time_point; 86   using time_point = clock_type::time_point;
87   87  
88   /// Type-erased callback for earliest-expiry-changed notifications. 88   /// Type-erased callback for earliest-expiry-changed notifications.
89   class callback 89   class callback
90   { 90   {
91   void* ctx_ = nullptr; 91   void* ctx_ = nullptr;
92   void (*fn_)(void*) = nullptr; 92   void (*fn_)(void*) = nullptr;
93   93  
94   public: 94   public:
95   /// Construct an empty callback. 95   /// Construct an empty callback.
HITCBC 96   605 callback() = default; 96   605 callback() = default;
97   97  
98   /// Construct a callback with the given context and function. 98   /// Construct a callback with the given context and function.
HITCBC 99   605 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {} 99   605 callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100   100  
101   /// Return true if the callback is non-empty. 101   /// Return true if the callback is non-empty.
102   explicit operator bool() const noexcept 102   explicit operator bool() const noexcept
103   { 103   {
104   return fn_ != nullptr; 104   return fn_ != nullptr;
105   } 105   }
106   106  
107   /// Invoke the callback. 107   /// Invoke the callback.
HITCBC 108   6745 void operator()() const 108   7953 void operator()() const
109   { 109   {
HITCBC 110   6745 if (fn_) 110   7953 if (fn_)
HITCBC 111   6745 fn_(ctx_); 111   7953 fn_(ctx_);
HITCBC 112   6745 } 112   7953 }
113   }; 113   };
114   114  
115   struct implementation; 115   struct implementation;
116   116  
117   private: 117   private:
118   struct heap_entry 118   struct heap_entry
119   { 119   {
120   time_point time_; 120   time_point time_;
121   implementation* timer_; 121   implementation* timer_;
122   }; 122   };
123   123  
124   scheduler* sched_ = nullptr; 124   scheduler* sched_ = nullptr;
125   mutable std::mutex mutex_; 125   mutable std::mutex mutex_;
126   std::vector<heap_entry> heap_; 126   std::vector<heap_entry> heap_;
127   implementation* free_list_ = nullptr; 127   implementation* free_list_ = nullptr;
128   waiter_node* waiter_free_list_ = nullptr; 128   waiter_node* waiter_free_list_ = nullptr;
129   callback on_earliest_changed_; 129   callback on_earliest_changed_;
130   bool shutting_down_ = false; 130   bool shutting_down_ = false;
131   // Avoids mutex in nearest_expiry() and empty() 131   // Avoids mutex in nearest_expiry() and empty()
132   mutable std::atomic<std::int64_t> cached_nearest_ns_{ 132   mutable std::atomic<std::int64_t> cached_nearest_ns_{
133   (std::numeric_limits<std::int64_t>::max)()}; 133   (std::numeric_limits<std::int64_t>::max)()};
134   134  
135   public: 135   public:
136   /// Construct the timer service bound to a scheduler. 136   /// Construct the timer service bound to a scheduler.
HITCBC 137   605 inline timer_service(capy::execution_context&, scheduler& sched) 137   605 inline timer_service(capy::execution_context&, scheduler& sched)
HITCBC 138   605 : sched_(&sched) 138   605 : sched_(&sched)
139   { 139   {
HITCBC 140   605 } 140   605 }
141   141  
142   /// Return the associated scheduler. 142   /// Return the associated scheduler.
HITCBC 143   13572 inline scheduler& get_scheduler() noexcept 143   15994 inline scheduler& get_scheduler() noexcept
144   { 144   {
HITCBC 145   13572 return *sched_; 145   15994 return *sched_;
146   } 146   }
147   147  
148   /// Destroy the timer service. 148   /// Destroy the timer service.
HITCBC 149   1210 ~timer_service() override = default; 149   1210 ~timer_service() override = default;
150   150  
151   timer_service(timer_service const&) = delete; 151   timer_service(timer_service const&) = delete;
152   timer_service& operator=(timer_service const&) = delete; 152   timer_service& operator=(timer_service const&) = delete;
153   153  
154   /// Register a callback invoked when the earliest expiry changes. 154   /// Register a callback invoked when the earliest expiry changes.
HITCBC 155   605 inline void set_on_earliest_changed(callback cb) 155   605 inline void set_on_earliest_changed(callback cb)
156   { 156   {
HITCBC 157   605 on_earliest_changed_ = cb; 157   605 on_earliest_changed_ = cb;
HITCBC 158   605 } 158   605 }
159   159  
160   /// Return true if no timers are in the heap. 160   /// Return true if no timers are in the heap.
161   inline bool empty() const noexcept 161   inline bool empty() const noexcept
162   { 162   {
163   return cached_nearest_ns_.load(std::memory_order_acquire) == 163   return cached_nearest_ns_.load(std::memory_order_acquire) ==
164   (std::numeric_limits<std::int64_t>::max)(); 164   (std::numeric_limits<std::int64_t>::max)();
165   } 165   }
166   166  
167   /// Return the nearest timer expiry without acquiring the mutex. 167   /// Return the nearest timer expiry without acquiring the mutex.
HITCBC 168   126860 inline time_point nearest_expiry() const noexcept 168   134218 inline time_point nearest_expiry() const noexcept
169   { 169   {
HITCBC 170   126860 auto ns = cached_nearest_ns_.load(std::memory_order_acquire); 170   134218 auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
HITCBC 171   126860 return time_point(time_point::duration(ns)); 171   134218 return time_point(time_point::duration(ns));
172   } 172   }
173   173  
174   /// Cancel all pending timers and free cached resources. 174   /// Cancel all pending timers and free cached resources.
175   inline void shutdown() override; 175   inline void shutdown() override;
176   176  
177   /// Construct a new timer implementation. 177   /// Construct a new timer implementation.
178   inline io_object::implementation* construct() override; 178   inline io_object::implementation* construct() override;
179   179  
180   /// Destroy a timer implementation, cancelling pending waiters. 180   /// Destroy a timer implementation, cancelling pending waiters.
181   inline void destroy(io_object::implementation* p) override; 181   inline void destroy(io_object::implementation* p) override;
182   182  
183   /// Cancel and recycle a timer implementation. 183   /// Cancel and recycle a timer implementation.
184   inline void destroy_impl(implementation& impl); 184   inline void destroy_impl(implementation& impl);
185   185  
186   /// Create or recycle a waiter node. 186   /// Create or recycle a waiter node.
187   inline waiter_node* create_waiter(); 187   inline waiter_node* create_waiter();
188   188  
189   /// Return a waiter node to the cache or free list. 189   /// Return a waiter node to the cache or free list.
190   inline void destroy_waiter(waiter_node* w); 190   inline void destroy_waiter(waiter_node* w);
191   191  
192   /// Update the timer expiry, cancelling existing waiters. 192   /// Update the timer expiry, cancelling existing waiters.
193   inline std::size_t update_timer(implementation& impl, time_point new_time); 193   inline std::size_t update_timer(implementation& impl, time_point new_time);
194   194  
195   /// Insert a waiter into the timer's waiter list and the heap. 195   /// Insert a waiter into the timer's waiter list and the heap.
196   inline void insert_waiter(implementation& impl, waiter_node* w); 196   inline void insert_waiter(implementation& impl, waiter_node* w);
197   197  
198   /// Cancel all waiters on a timer. 198   /// Cancel all waiters on a timer.
199   inline std::size_t cancel_timer(implementation& impl); 199   inline std::size_t cancel_timer(implementation& impl);
200   200  
201   /// Cancel a single waiter ( stop_token callback path ). 201   /// Cancel a single waiter ( stop_token callback path ).
202   inline void cancel_waiter(waiter_node* w); 202   inline void cancel_waiter(waiter_node* w);
203   203  
204   /// Cancel one waiter on a timer. 204   /// Cancel one waiter on a timer.
205   inline std::size_t cancel_one_waiter(implementation& impl); 205   inline std::size_t cancel_one_waiter(implementation& impl);
206   206  
207   /// Complete all waiters whose timers have expired. 207   /// Complete all waiters whose timers have expired.
208   inline std::size_t process_expired(); 208   inline std::size_t process_expired();
209   209  
210   private: 210   private:
HITCBC 211   154779 inline void refresh_cached_nearest() noexcept 211   162526 inline void refresh_cached_nearest() noexcept
212   { 212   {
HITCBC 213   154779 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)() 213   162526 auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
HITCBC 214   154238 : heap_[0].time_.time_since_epoch().count(); 214   161990 : heap_[0].time_.time_since_epoch().count();
HITCBC 215   154779 cached_nearest_ns_.store(ns, std::memory_order_release); 215   162526 cached_nearest_ns_.store(ns, std::memory_order_release);
HITCBC 216   154779 } 216   162526 }
217   217  
218   inline void remove_timer_impl(implementation& impl); 218   inline void remove_timer_impl(implementation& impl);
219   inline void up_heap(std::size_t index); 219   inline void up_heap(std::size_t index);
220   inline void down_heap(std::size_t index); 220   inline void down_heap(std::size_t index);
221   inline void swap_heap(std::size_t i1, std::size_t i2); 221   inline void swap_heap(std::size_t i1, std::size_t i2);
222   }; 222   };
223   223  
224   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node 224   struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225   : intrusive_list<waiter_node>::node 225   : intrusive_list<waiter_node>::node
226   { 226   {
227   // Embedded completion op — avoids heap allocation per fire/cancel 227   // Embedded completion op — avoids heap allocation per fire/cancel
228   struct completion_op final : scheduler_op 228   struct completion_op final : scheduler_op
229   { 229   {
230   waiter_node* waiter_ = nullptr; 230   waiter_node* waiter_ = nullptr;
231   231  
232   static void do_complete( 232   static void do_complete(
233   void* owner, scheduler_op* base, std::uint32_t, std::uint32_t); 233   void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234   234  
HITCBC 235   214 completion_op() noexcept : scheduler_op(&do_complete) {} 235   214 completion_op() noexcept : scheduler_op(&do_complete) {}
236   236  
237   void operator()() override; 237   void operator()() override;
238   void destroy() override; 238   void destroy() override;
239   }; 239   };
240   240  
241   // Per-waiter stop_token cancellation 241   // Per-waiter stop_token cancellation
242   struct canceller 242   struct canceller
243   { 243   {
244   waiter_node* waiter_; 244   waiter_node* waiter_;
245   void operator()() const; 245   void operator()() const;
246   }; 246   };
247   247  
248   // nullptr once removed from timer's waiter list (concurrency marker) 248   // nullptr once removed from timer's waiter list (concurrency marker)
249   timer_service::implementation* impl_ = nullptr; 249   timer_service::implementation* impl_ = nullptr;
250   timer_service* svc_ = nullptr; 250   timer_service* svc_ = nullptr;
251   std::coroutine_handle<> h_; 251   std::coroutine_handle<> h_;
252   capy::continuation* cont_ = nullptr; 252   capy::continuation* cont_ = nullptr;
253   capy::executor_ref d_; 253   capy::executor_ref d_;
254   std::error_code* ec_out_ = nullptr; 254   std::error_code* ec_out_ = nullptr;
255   std::stop_token token_; 255   std::stop_token token_;
256   std::optional<std::stop_callback<canceller>> stop_cb_; 256   std::optional<std::stop_callback<canceller>> stop_cb_;
257   completion_op op_; 257   completion_op op_;
258   std::error_code ec_value_; 258   std::error_code ec_value_;
259   waiter_node* next_free_ = nullptr; 259   waiter_node* next_free_ = nullptr;
260   260  
HITCBC 261   214 waiter_node() noexcept 261   214 waiter_node() noexcept
HITCBC 262   214 { 262   214 {
HITCBC 263   214 op_.waiter_ = this; 263   214 op_.waiter_ = this;
HITCBC 264   214 } 264   214 }
265   }; 265   };
266   266  
267   struct timer_service::implementation final : timer::implementation 267   struct timer_service::implementation final : timer::implementation
268   { 268   {
269   using clock_type = std::chrono::steady_clock; 269   using clock_type = std::chrono::steady_clock;
270   using time_point = clock_type::time_point; 270   using time_point = clock_type::time_point;
271   using duration = clock_type::duration; 271   using duration = clock_type::duration;
272   272  
273   timer_service* svc_ = nullptr; 273   timer_service* svc_ = nullptr;
274   intrusive_list<waiter_node> waiters_; 274   intrusive_list<waiter_node> waiters_;
275   275  
276   // Free list linkage (reused when impl is on free_list) 276   // Free list linkage (reused when impl is on free_list)
277   implementation* next_free_ = nullptr; 277   implementation* next_free_ = nullptr;
278   278  
279   inline explicit implementation(timer_service& svc) noexcept; 279   inline explicit implementation(timer_service& svc) noexcept;
280   280  
281   inline std::coroutine_handle<> wait( 281   inline std::coroutine_handle<> wait(
282   std::coroutine_handle<>, 282   std::coroutine_handle<>,
283   capy::executor_ref, 283   capy::executor_ref,
284   std::stop_token, 284   std::stop_token,
285   std::error_code*, 285   std::error_code*,
286   capy::continuation*) override; 286   capy::continuation*) override;
287   }; 287   };
288   288  
289   // Thread-local caches avoid hot-path mutex acquisitions: 289   // Thread-local caches avoid hot-path mutex acquisitions:
290   // 1. Impl cache — single-slot, validated by comparing svc_ 290   // 1. Impl cache — single-slot, validated by comparing svc_
291   // 2. Waiter cache — single-slot, no service affinity 291   // 2. Waiter cache — single-slot, no service affinity
292   // All caches are cleared by timer_service_invalidate_cache() during shutdown. 292   // All caches are cleared by timer_service_invalidate_cache() during shutdown.
293   293  
294   inline thread_local_ptr<timer_service::implementation> tl_cached_impl; 294   inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
295   inline thread_local_ptr<waiter_node> tl_cached_waiter; 295   inline thread_local_ptr<waiter_node> tl_cached_waiter;
296   296  
297   inline timer_service::implementation* 297   inline timer_service::implementation*
HITCBC 298   6991 try_pop_tl_cache(timer_service* svc) noexcept 298   8203 try_pop_tl_cache(timer_service* svc) noexcept
299   { 299   {
HITCBC 300   6991 auto* impl = tl_cached_impl.get(); 300   8203 auto* impl = tl_cached_impl.get();
HITCBC 301   6991 if (impl) 301   8203 if (impl)
302   { 302   {
HITCBC 303   6738 tl_cached_impl.set(nullptr); 303   7950 tl_cached_impl.set(nullptr);
HITCBC 304   6738 if (impl->svc_ == svc) 304   7950 if (impl->svc_ == svc)
HITCBC 305   6738 return impl; 305   7950 return impl;
306   // Stale impl from a destroyed service 306   // Stale impl from a destroyed service
MISUBC 307   delete impl; 307   delete impl;
308   } 308   }
HITCBC 309   253 return nullptr; 309   253 return nullptr;
310   } 310   }
311   311  
312   inline bool 312   inline bool
HITCBC 313   6983 try_push_tl_cache(timer_service::implementation* impl) noexcept 313   8195 try_push_tl_cache(timer_service::implementation* impl) noexcept
314   { 314   {
HITCBC 315   6983 if (!tl_cached_impl.get()) 315   8195 if (!tl_cached_impl.get())
316   { 316   {
HITCBC 317   6903 tl_cached_impl.set(impl); 317   8115 tl_cached_impl.set(impl);
HITCBC 318   6903 return true; 318   8115 return true;
319   } 319   }
HITCBC 320   80 return false; 320   80 return false;
321   } 321   }
322   322  
323   inline waiter_node* 323   inline waiter_node*
HITCBC 324   6790 try_pop_waiter_tl_cache() noexcept 324   8001 try_pop_waiter_tl_cache() noexcept
325   { 325   {
HITCBC 326   6790 auto* w = tl_cached_waiter.get(); 326   8001 auto* w = tl_cached_waiter.get();
HITCBC 327   6790 if (w) 327   8001 if (w)
328   { 328   {
HITCBC 329   6574 tl_cached_waiter.set(nullptr); 329   7785 tl_cached_waiter.set(nullptr);
HITCBC 330   6574 return w; 330   7785 return w;
331   } 331   }
HITCBC 332   216 return nullptr; 332   216 return nullptr;
333   } 333   }
334   334  
335   inline bool 335   inline bool
HITCBC 336   6774 try_push_waiter_tl_cache(waiter_node* w) noexcept 336   7985 try_push_waiter_tl_cache(waiter_node* w) noexcept
337   { 337   {
HITCBC 338   6774 if (!tl_cached_waiter.get()) 338   7985 if (!tl_cached_waiter.get())
339   { 339   {
HITCBC 340   6694 tl_cached_waiter.set(w); 340   7905 tl_cached_waiter.set(w);
HITCBC 341   6694 return true; 341   7905 return true;
342   } 342   }
HITCBC 343   80 return false; 343   80 return false;
344   } 344   }
345   345  
346   inline void 346   inline void
HITCBC 347   605 timer_service_invalidate_cache() noexcept 347   605 timer_service_invalidate_cache() noexcept
348   { 348   {
HITCBC 349   605 delete tl_cached_impl.get(); 349   605 delete tl_cached_impl.get();
HITCBC 350   605 tl_cached_impl.set(nullptr); 350   605 tl_cached_impl.set(nullptr);
351   351  
HITCBC 352   605 delete tl_cached_waiter.get(); 352   605 delete tl_cached_waiter.get();
HITCBC 353   605 tl_cached_waiter.set(nullptr); 353   605 tl_cached_waiter.set(nullptr);
HITCBC 354   605 } 354   605 }
355   355  
356   // timer_service out-of-class member function definitions 356   // timer_service out-of-class member function definitions
357   357  
HITCBC 358   253 inline timer_service::implementation::implementation( 358   253 inline timer_service::implementation::implementation(
HITCBC 359   253 timer_service& svc) noexcept 359   253 timer_service& svc) noexcept
HITCBC 360   253 : svc_(&svc) 360   253 : svc_(&svc)
361   { 361   {
HITCBC 362   253 } 362   253 }
363   363  
364   inline void 364   inline void
HITCBC 365   605 timer_service::shutdown() 365   605 timer_service::shutdown()
366   { 366   {
HITCBC 367   605 timer_service_invalidate_cache(); 367   605 timer_service_invalidate_cache();
HITCBC 368   605 shutting_down_ = true; 368   605 shutting_down_ = true;
369   369  
370   // Snapshot impls and detach them from the heap so that 370   // Snapshot impls and detach them from the heap so that
371   // coroutine-owned timer destructors (triggered by h.destroy() 371   // coroutine-owned timer destructors (triggered by h.destroy()
372   // below) cannot re-enter remove_timer_impl() and mutate the 372   // below) cannot re-enter remove_timer_impl() and mutate the
373   // vector during iteration. 373   // vector during iteration.
HITCBC 374   605 std::vector<implementation*> impls; 374   605 std::vector<implementation*> impls;
HITCBC 375   605 impls.reserve(heap_.size()); 375   605 impls.reserve(heap_.size());
HITCBC 376   613 for (auto& entry : heap_) 376   613 for (auto& entry : heap_)
377   { 377   {
HITCBC 378   8 entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 378   8 entry.timer_->heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 379   8 impls.push_back(entry.timer_); 379   8 impls.push_back(entry.timer_);
380   } 380   }
HITCBC 381   605 heap_.clear(); 381   605 heap_.clear();
HITCBC 382   605 cached_nearest_ns_.store( 382   605 cached_nearest_ns_.store(
383   (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release); 383   (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
384   384  
385   // Cancel waiting timers. Each waiter called work_started() 385   // Cancel waiting timers. Each waiter called work_started()
386   // in implementation::wait(). On IOCP the scheduler shutdown 386   // in implementation::wait(). On IOCP the scheduler shutdown
387   // loop exits when outstanding_work_ reaches zero, so we must 387   // loop exits when outstanding_work_ reaches zero, so we must
388   // call work_finished() here to balance it. On other backends 388   // call work_finished() here to balance it. On other backends
389   // this is harmless. 389   // this is harmless.
HITCBC 390   613 for (auto* impl : impls) 390   613 for (auto* impl : impls)
391   { 391   {
HITCBC 392   16 while (auto* w = impl->waiters_.pop_front()) 392   16 while (auto* w = impl->waiters_.pop_front())
393   { 393   {
HITCBC 394   8 w->stop_cb_.reset(); 394   8 w->stop_cb_.reset();
HITCBC 395   8 auto h = std::exchange(w->h_, {}); 395   8 auto h = std::exchange(w->h_, {});
HITCBC 396   8 sched_->work_finished(); 396   8 sched_->work_finished();
HITCBC 397   8 if (h) 397   8 if (h)
HITCBC 398   8 h.destroy(); 398   8 h.destroy();
HITCBC 399   8 delete w; 399   8 delete w;
HITCBC 400   8 } 400   8 }
HITCBC 401   8 delete impl; 401   8 delete impl;
402   } 402   }
403   403  
404   // Delete free-listed impls 404   // Delete free-listed impls
HITCBC 405   685 while (free_list_) 405   685 while (free_list_)
406   { 406   {
HITCBC 407   80 auto* next = free_list_->next_free_; 407   80 auto* next = free_list_->next_free_;
HITCBC 408   80 delete free_list_; 408   80 delete free_list_;
HITCBC 409   80 free_list_ = next; 409   80 free_list_ = next;
410   } 410   }
411   411  
412   // Delete free-listed waiters 412   // Delete free-listed waiters
HITCBC 413   683 while (waiter_free_list_) 413   683 while (waiter_free_list_)
414   { 414   {
HITCBC 415   78 auto* next = waiter_free_list_->next_free_; 415   78 auto* next = waiter_free_list_->next_free_;
HITCBC 416   78 delete waiter_free_list_; 416   78 delete waiter_free_list_;
HITCBC 417   78 waiter_free_list_ = next; 417   78 waiter_free_list_ = next;
418   } 418   }
HITCBC 419   605 } 419   605 }
420   420  
421   inline io_object::implementation* 421   inline io_object::implementation*
HITCBC 422   6991 timer_service::construct() 422   8203 timer_service::construct()
423   { 423   {
HITCBC 424   6991 implementation* impl = try_pop_tl_cache(this); 424   8203 implementation* impl = try_pop_tl_cache(this);
HITCBC 425   6991 if (impl) 425   8203 if (impl)
426   { 426   {
HITCBC 427   6738 impl->svc_ = this; 427   7950 impl->svc_ = this;
HITCBC 428   6738 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 428   7950 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 429   6738 impl->might_have_pending_waits_ = false; 429   7950 impl->might_have_pending_waits_ = false;
HITCBC 430   6738 return impl; 430   7950 return impl;
431   } 431   }
432   432  
HITCBC 433   253 std::lock_guard lock(mutex_); 433   253 std::lock_guard lock(mutex_);
HITCBC 434   253 if (free_list_) 434   253 if (free_list_)
435   { 435   {
MISUBC 436   impl = free_list_; 436   impl = free_list_;
MISUBC 437   free_list_ = impl->next_free_; 437   free_list_ = impl->next_free_;
MISUBC 438   impl->next_free_ = nullptr; 438   impl->next_free_ = nullptr;
MISUBC 439   impl->svc_ = this; 439   impl->svc_ = this;
MISUBC 440   impl->heap_index_ = (std::numeric_limits<std::size_t>::max)(); 440   impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
MISUBC 441   impl->might_have_pending_waits_ = false; 441   impl->might_have_pending_waits_ = false;
442   } 442   }
443   else 443   else
444   { 444   {
HITCBC 445   253 impl = new implementation(*this); 445   253 impl = new implementation(*this);
446   } 446   }
HITCBC 447   253 return impl; 447   253 return impl;
HITCBC 448   253 } 448   253 }
449   449  
450   inline void 450   inline void
HITCBC 451   6989 timer_service::destroy(io_object::implementation* p) 451   8201 timer_service::destroy(io_object::implementation* p)
452   { 452   {
HITCBC 453   6989 destroy_impl(static_cast<implementation&>(*p)); 453   8201 destroy_impl(static_cast<implementation&>(*p));
HITCBC 454   6989 } 454   8201 }
455   455  
456   inline void 456   inline void
HITCBC 457   6989 timer_service::destroy_impl(implementation& impl) 457   8201 timer_service::destroy_impl(implementation& impl)
458   { 458   {
459   // During shutdown the impl is owned by the shutdown loop. 459   // During shutdown the impl is owned by the shutdown loop.
460   // Re-entering here (from a coroutine-owned timer destructor 460   // Re-entering here (from a coroutine-owned timer destructor
461   // triggered by h.destroy()) must not modify the heap or 461   // triggered by h.destroy()) must not modify the heap or
462   // recycle the impl — shutdown deletes it directly. 462   // recycle the impl — shutdown deletes it directly.
HITCBC 463   6989 if (shutting_down_) 463   8201 if (shutting_down_)
HITCBC 464   6909 return; 464   8121 return;
465   465  
HITCBC 466   6983 cancel_timer(impl); 466   8195 cancel_timer(impl);
467   467  
HITCBC 468   6983 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)()) 468   8195 if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
469   { 469   {
MISUBC 470   std::lock_guard lock(mutex_); 470   std::lock_guard lock(mutex_);
MISUBC 471   remove_timer_impl(impl); 471   remove_timer_impl(impl);
MISUBC 472   refresh_cached_nearest(); 472   refresh_cached_nearest();
MISUBC 473   } 473   }
474   474  
HITCBC 475   6983 if (try_push_tl_cache(&impl)) 475   8195 if (try_push_tl_cache(&impl))
HITCBC 476   6903 return; 476   8115 return;
477   477  
HITCBC 478   80 std::lock_guard lock(mutex_); 478   80 std::lock_guard lock(mutex_);
HITCBC 479   80 impl.next_free_ = free_list_; 479   80 impl.next_free_ = free_list_;
HITCBC 480   80 free_list_ = &impl; 480   80 free_list_ = &impl;
HITCBC 481   80 } 481   80 }
482   482  
483   inline waiter_node* 483   inline waiter_node*
HITCBC 484   6790 timer_service::create_waiter() 484   8001 timer_service::create_waiter()
485   { 485   {
HITCBC 486   6790 if (auto* w = try_pop_waiter_tl_cache()) 486   8001 if (auto* w = try_pop_waiter_tl_cache())
HITCBC 487   6574 return w; 487   7785 return w;
488   488  
HITCBC 489   216 std::lock_guard lock(mutex_); 489   216 std::lock_guard lock(mutex_);
HITCBC 490   216 if (waiter_free_list_) 490   216 if (waiter_free_list_)
491   { 491   {
HITCBC 492   2 auto* w = waiter_free_list_; 492   2 auto* w = waiter_free_list_;
HITCBC 493   2 waiter_free_list_ = w->next_free_; 493   2 waiter_free_list_ = w->next_free_;
HITCBC 494   2 w->next_free_ = nullptr; 494   2 w->next_free_ = nullptr;
HITCBC 495   2 return w; 495   2 return w;
496   } 496   }
497   497  
HITCBC 498   214 return new waiter_node(); 498   214 return new waiter_node();
HITCBC 499   216 } 499   216 }
500   500  
501   inline void 501   inline void
HITCBC 502   6774 timer_service::destroy_waiter(waiter_node* w) 502   7985 timer_service::destroy_waiter(waiter_node* w)
503   { 503   {
HITCBC 504   6774 if (try_push_waiter_tl_cache(w)) 504   7985 if (try_push_waiter_tl_cache(w))
HITCBC 505   6694 return; 505   7905 return;
506   506  
HITCBC 507   80 std::lock_guard lock(mutex_); 507   80 std::lock_guard lock(mutex_);
HITCBC 508   80 w->next_free_ = waiter_free_list_; 508   80 w->next_free_ = waiter_free_list_;
HITCBC 509   80 waiter_free_list_ = w; 509   80 waiter_free_list_ = w;
HITCBC 510   80 } 510   80 }
511   511  
512   inline std::size_t 512   inline std::size_t
HITCBC 513   6 timer_service::update_timer(implementation& impl, time_point new_time) 513   6 timer_service::update_timer(implementation& impl, time_point new_time)
514   { 514   {
515   bool in_heap = 515   bool in_heap =
HITCBC 516   6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)()); 516   6 (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
HITCBC 517   6 if (!in_heap && impl.waiters_.empty()) 517   6 if (!in_heap && impl.waiters_.empty())
MISUBC 518   return 0; 518   return 0;
519   519  
HITCBC 520   6 bool notify = false; 520   6 bool notify = false;
HITCBC 521   6 intrusive_list<waiter_node> canceled; 521   6 intrusive_list<waiter_node> canceled;
522   522  
523   { 523   {
HITCBC 524   6 std::lock_guard lock(mutex_); 524   6 std::lock_guard lock(mutex_);
525   525  
HITCBC 526   16 while (auto* w = impl.waiters_.pop_front()) 526   16 while (auto* w = impl.waiters_.pop_front())
527   { 527   {
HITCBC 528   10 w->impl_ = nullptr; 528   10 w->impl_ = nullptr;
HITCBC 529   10 canceled.push_back(w); 529   10 canceled.push_back(w);
HITCBC 530   10 } 530   10 }
531   531  
HITCBC 532   6 if (impl.heap_index_ < heap_.size()) 532   6 if (impl.heap_index_ < heap_.size())
533   { 533   {
HITCBC 534   6 time_point old_time = heap_[impl.heap_index_].time_; 534   6 time_point old_time = heap_[impl.heap_index_].time_;
HITCBC 535   6 heap_[impl.heap_index_].time_ = new_time; 535   6 heap_[impl.heap_index_].time_ = new_time;
536   536  
HITCBC 537   6 if (new_time < old_time) 537   6 if (new_time < old_time)
HITCBC 538   6 up_heap(impl.heap_index_); 538   6 up_heap(impl.heap_index_);
539   else 539   else
MISUBC 540   down_heap(impl.heap_index_); 540   down_heap(impl.heap_index_);
541   541  
HITCBC 542   6 notify = (impl.heap_index_ == 0); 542   6 notify = (impl.heap_index_ == 0);
543   } 543   }
544   544  
HITCBC 545   6 refresh_cached_nearest(); 545   6 refresh_cached_nearest();
HITCBC 546   6 } 546   6 }
547   547  
HITCBC 548   6 std::size_t count = 0; 548   6 std::size_t count = 0;
HITCBC 549   16 while (auto* w = canceled.pop_front()) 549   16 while (auto* w = canceled.pop_front())
550   { 550   {
HITCBC 551   10 w->ec_value_ = make_error_code(capy::error::canceled); 551   10 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 552   10 sched_->post(&w->op_); 552   10 sched_->post(&w->op_);
HITCBC 553   10 ++count; 553   10 ++count;
HITCBC 554   10 } 554   10 }
555   555  
HITCBC 556   6 if (notify) 556   6 if (notify)
HITCBC 557   6 on_earliest_changed_(); 557   6 on_earliest_changed_();
558   558  
HITCBC 559   6 return count; 559   6 return count;
560   } 560   }
561   561  
562   inline void 562   inline void
HITCBC 563   6790 timer_service::insert_waiter(implementation& impl, waiter_node* w) 563   8001 timer_service::insert_waiter(implementation& impl, waiter_node* w)
564   { 564   {
HITCBC 565   6790 bool notify = false; 565   8001 bool notify = false;
566   { 566   {
HITCBC 567   6790 std::lock_guard lock(mutex_); 567   8001 std::lock_guard lock(mutex_);
HITCBC 568   6790 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)()) 568   8001 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
569   { 569   {
HITCBC 570   6768 impl.heap_index_ = heap_.size(); 570   7979 impl.heap_index_ = heap_.size();
HITCBC 571   6768 heap_.push_back({impl.expiry_, &impl}); 571   7979 heap_.push_back({impl.expiry_, &impl});
HITCBC 572   6768 up_heap(heap_.size() - 1); 572   7979 up_heap(heap_.size() - 1);
HITCBC 573   6768 notify = (impl.heap_index_ == 0); 573   7979 notify = (impl.heap_index_ == 0);
HITCBC 574   6768 refresh_cached_nearest(); 574   7979 refresh_cached_nearest();
575   } 575   }
HITCBC 576   6790 impl.waiters_.push_back(w); 576   8001 impl.waiters_.push_back(w);
HITCBC 577   6790 } 577   8001 }
HITCBC 578   6790 if (notify) 578   8001 if (notify)
HITCBC 579   6739 on_earliest_changed_(); 579   7947 on_earliest_changed_();
HITCBC 580   6790 } 580   8001 }
581   581  
582   inline std::size_t 582   inline std::size_t
HITCBC 583   6991 timer_service::cancel_timer(implementation& impl) 583   8203 timer_service::cancel_timer(implementation& impl)
584   { 584   {
HITCBC 585   6991 if (!impl.might_have_pending_waits_) 585   8203 if (!impl.might_have_pending_waits_)
HITCBC 586   6967 return 0; 586   8179 return 0;
587   587  
588   // Not in heap and no waiters — just clear the flag 588   // Not in heap and no waiters — just clear the flag
HITCBC 589   24 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() && 589   24 if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
MISUBC 590   impl.waiters_.empty()) 590   impl.waiters_.empty())
591   { 591   {
MISUBC 592   impl.might_have_pending_waits_ = false; 592   impl.might_have_pending_waits_ = false;
MISUBC 593   return 0; 593   return 0;
594   } 594   }
595   595  
HITCBC 596   24 intrusive_list<waiter_node> canceled; 596   24 intrusive_list<waiter_node> canceled;
597   597  
598   { 598   {
HITCBC 599   24 std::lock_guard lock(mutex_); 599   24 std::lock_guard lock(mutex_);
HITCBC 600   24 remove_timer_impl(impl); 600   24 remove_timer_impl(impl);
HITCBC 601   52 while (auto* w = impl.waiters_.pop_front()) 601   52 while (auto* w = impl.waiters_.pop_front())
602   { 602   {
HITCBC 603   28 w->impl_ = nullptr; 603   28 w->impl_ = nullptr;
HITCBC 604   28 canceled.push_back(w); 604   28 canceled.push_back(w);
HITCBC 605   28 } 605   28 }
HITCBC 606   24 refresh_cached_nearest(); 606   24 refresh_cached_nearest();
HITCBC 607   24 } 607   24 }
608   608  
HITCBC 609   24 impl.might_have_pending_waits_ = false; 609   24 impl.might_have_pending_waits_ = false;
610   610  
HITCBC 611   24 std::size_t count = 0; 611   24 std::size_t count = 0;
HITCBC 612   52 while (auto* w = canceled.pop_front()) 612   52 while (auto* w = canceled.pop_front())
613   { 613   {
HITCBC 614   28 w->ec_value_ = make_error_code(capy::error::canceled); 614   28 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 615   28 sched_->post(&w->op_); 615   28 sched_->post(&w->op_);
HITCBC 616   28 ++count; 616   28 ++count;
HITCBC 617   28 } 617   28 }
618   618  
HITCBC 619   24 return count; 619   24 return count;
620   } 620   }
621   621  
622   inline void 622   inline void
HITCBC 623   30 timer_service::cancel_waiter(waiter_node* w) 623   30 timer_service::cancel_waiter(waiter_node* w)
624   { 624   {
625   { 625   {
HITCBC 626   30 std::lock_guard lock(mutex_); 626   30 std::lock_guard lock(mutex_);
627   // Already removed by cancel_timer or process_expired 627   // Already removed by cancel_timer or process_expired
HITCBC 628   30 if (!w->impl_) 628   30 if (!w->impl_)
MISUBC 629   return; 629   return;
HITCBC 630   30 auto* impl = w->impl_; 630   30 auto* impl = w->impl_;
HITCBC 631   30 w->impl_ = nullptr; 631   30 w->impl_ = nullptr;
HITCBC 632   30 impl->waiters_.remove(w); 632   30 impl->waiters_.remove(w);
HITCBC 633   30 if (impl->waiters_.empty()) 633   30 if (impl->waiters_.empty())
634   { 634   {
HITCBC 635   28 remove_timer_impl(*impl); 635   28 remove_timer_impl(*impl);
HITCBC 636   28 impl->might_have_pending_waits_ = false; 636   28 impl->might_have_pending_waits_ = false;
637   } 637   }
HITCBC 638   30 refresh_cached_nearest(); 638   30 refresh_cached_nearest();
HITCBC 639   30 } 639   30 }
640   640  
HITCBC 641   30 w->ec_value_ = make_error_code(capy::error::canceled); 641   30 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 642   30 sched_->post(&w->op_); 642   30 sched_->post(&w->op_);
643   } 643   }
644   644  
645   inline std::size_t 645   inline std::size_t
HITCBC 646   2 timer_service::cancel_one_waiter(implementation& impl) 646   2 timer_service::cancel_one_waiter(implementation& impl)
647   { 647   {
HITCBC 648   2 if (!impl.might_have_pending_waits_) 648   2 if (!impl.might_have_pending_waits_)
MISUBC 649   return 0; 649   return 0;
650   650  
HITCBC 651   2 waiter_node* w = nullptr; 651   2 waiter_node* w = nullptr;
652   652  
653   { 653   {
HITCBC 654   2 std::lock_guard lock(mutex_); 654   2 std::lock_guard lock(mutex_);
HITCBC 655   2 w = impl.waiters_.pop_front(); 655   2 w = impl.waiters_.pop_front();
HITCBC 656   2 if (!w) 656   2 if (!w)
MISUBC 657   return 0; 657   return 0;
HITCBC 658   2 w->impl_ = nullptr; 658   2 w->impl_ = nullptr;
HITCBC 659   2 if (impl.waiters_.empty()) 659   2 if (impl.waiters_.empty())
660   { 660   {
MISUBC 661   remove_timer_impl(impl); 661   remove_timer_impl(impl);
MISUBC 662   impl.might_have_pending_waits_ = false; 662   impl.might_have_pending_waits_ = false;
663   } 663   }
HITCBC 664   2 refresh_cached_nearest(); 664   2 refresh_cached_nearest();
HITCBC 665   2 } 665   2 }
666   666  
HITCBC 667   2 w->ec_value_ = make_error_code(capy::error::canceled); 667   2 w->ec_value_ = make_error_code(capy::error::canceled);
HITCBC 668   2 sched_->post(&w->op_); 668   2 sched_->post(&w->op_);
HITCBC 669   2 return 1; 669   2 return 1;
670   } 670   }
671   671  
672   inline std::size_t 672   inline std::size_t
HITCBC 673   147949 timer_service::process_expired() 673   154485 timer_service::process_expired()
674   { 674   {
HITCBC 675   147949 intrusive_list<waiter_node> expired; 675   154485 intrusive_list<waiter_node> expired;
676   676  
677   { 677   {
HITCBC 678   147949 std::lock_guard lock(mutex_); 678   154485 std::lock_guard lock(mutex_);
HITCBC 679   147949 auto now = clock_type::now(); 679   154485 auto now = clock_type::now();
680   680  
HITCBC 681   154657 while (!heap_.empty() && heap_[0].time_ <= now) 681   162404 while (!heap_.empty() && heap_[0].time_ <= now)
682   { 682   {
HITCBC 683   6708 implementation* t = heap_[0].timer_; 683   7919 implementation* t = heap_[0].timer_;
HITCBC 684   6708 remove_timer_impl(*t); 684   7919 remove_timer_impl(*t);
HITCBC 685   13420 while (auto* w = t->waiters_.pop_front()) 685   15842 while (auto* w = t->waiters_.pop_front())
686   { 686   {
HITCBC 687   6712 w->impl_ = nullptr; 687   7923 w->impl_ = nullptr;
HITCBC 688   6712 w->ec_value_ = {}; 688   7923 w->ec_value_ = {};
HITCBC 689   6712 expired.push_back(w); 689   7923 expired.push_back(w);
HITCBC 690   6712 } 690   7923 }
HITCBC 691   6708 t->might_have_pending_waits_ = false; 691   7919 t->might_have_pending_waits_ = false;
692   } 692   }
693   693  
HITCBC 694   147949 refresh_cached_nearest(); 694   154485 refresh_cached_nearest();
HITCBC 695   147949 } 695   154485 }
696   696  
HITCBC 697   147949 std::size_t count = 0; 697   154485 std::size_t count = 0;
HITCBC 698   154661 while (auto* w = expired.pop_front()) 698   162408 while (auto* w = expired.pop_front())
699   { 699   {
HITCBC 700   6712 sched_->post(&w->op_); 700   7923 sched_->post(&w->op_);
HITCBC 701   6712 ++count; 701   7923 ++count;
HITCBC 702   6712 } 702   7923 }
703   703  
HITCBC 704   147949 return count; 704   154485 return count;
705   } 705   }
706   706  
707   inline void 707   inline void
HITCBC 708   6760 timer_service::remove_timer_impl(implementation& impl) 708   7971 timer_service::remove_timer_impl(implementation& impl)
709   { 709   {
HITCBC 710   6760 std::size_t index = impl.heap_index_; 710   7971 std::size_t index = impl.heap_index_;
HITCBC 711   6760 if (index >= heap_.size()) 711   7971 if (index >= heap_.size())
MISUBC 712   return; // Not in heap 712   return; // Not in heap
713   713  
HITCBC 714   6760 if (index == heap_.size() - 1) 714   7971 if (index == heap_.size() - 1)
715   { 715   {
716   // Last element, just pop 716   // Last element, just pop
HITCBC 717   157 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)(); 717   155 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 718   157 heap_.pop_back(); 718   155 heap_.pop_back();
719   } 719   }
720   else 720   else
721   { 721   {
722   // Swap with last and reheapify 722   // Swap with last and reheapify
HITCBC 723   6603 swap_heap(index, heap_.size() - 1); 723   7816 swap_heap(index, heap_.size() - 1);
HITCBC 724   6603 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)(); 724   7816 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
HITCBC 725   6603 heap_.pop_back(); 725   7816 heap_.pop_back();
726   726  
HITCBC 727   6603 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_) 727   7816 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
MISUBC 728   up_heap(index); 728   up_heap(index);
729   else 729   else
HITCBC 730   6603 down_heap(index); 730   7816 down_heap(index);
731   } 731   }
732   } 732   }
733   733  
734   inline void 734   inline void
HITCBC 735   6774 timer_service::up_heap(std::size_t index) 735   7985 timer_service::up_heap(std::size_t index)
736   { 736   {
HITCBC 737   13358 while (index > 0) 737   15779 while (index > 0)
738   { 738   {
HITCBC 739   6613 std::size_t parent = (index - 1) / 2; 739   7826 std::size_t parent = (index - 1) / 2;
HITCBC 740   6613 if (!(heap_[index].time_ < heap_[parent].time_)) 740   7826 if (!(heap_[index].time_ < heap_[parent].time_))
HITCBC 741   29 break; 741   32 break;
HITCBC 742   6584 swap_heap(index, parent); 742   7794 swap_heap(index, parent);
HITCBC 743   6584 index = parent; 743   7794 index = parent;
744   } 744   }
HITCBC 745   6774 } 745   7985 }
746   746  
747   inline void 747   inline void
HITCBC 748   6603 timer_service::down_heap(std::size_t index) 748   7816 timer_service::down_heap(std::size_t index)
749   { 749   {
HITCBC 750   6603 std::size_t child = index * 2 + 1; 750   7816 std::size_t child = index * 2 + 1;
HITCBC 751   6603 while (child < heap_.size()) 751   7816 while (child < heap_.size())
752   { 752   {
HITCBC 753   6 std::size_t min_child = (child + 1 == heap_.size() || 753   6 std::size_t min_child = (child + 1 == heap_.size() ||
MISUBC 754   heap_[child].time_ < heap_[child + 1].time_) 754   heap_[child].time_ < heap_[child + 1].time_)
HITCBC 755   6 ? child 755   6 ? child
HITCBC 756   6 : child + 1; 756   6 : child + 1;
757   757  
HITCBC 758   6 if (heap_[index].time_ < heap_[min_child].time_) 758   6 if (heap_[index].time_ < heap_[min_child].time_)
HITCBC 759   6 break; 759   6 break;
760   760  
MISUBC 761   swap_heap(index, min_child); 761   swap_heap(index, min_child);
MISUBC 762   index = min_child; 762   index = min_child;
MISUBC 763   child = index * 2 + 1; 763   child = index * 2 + 1;
764   } 764   }
HITCBC 765   6603 } 765   7816 }
766   766  
767   inline void 767   inline void
HITCBC 768   13187 timer_service::swap_heap(std::size_t i1, std::size_t i2) 768   15610 timer_service::swap_heap(std::size_t i1, std::size_t i2)
769   { 769   {
HITCBC 770   13187 heap_entry tmp = heap_[i1]; 770   15610 heap_entry tmp = heap_[i1];
HITCBC 771   13187 heap_[i1] = heap_[i2]; 771   15610 heap_[i1] = heap_[i2];
HITCBC 772   13187 heap_[i2] = tmp; 772   15610 heap_[i2] = tmp;
HITCBC 773   13187 heap_[i1].timer_->heap_index_ = i1; 773   15610 heap_[i1].timer_->heap_index_ = i1;
HITCBC 774   13187 heap_[i2].timer_->heap_index_ = i2; 774   15610 heap_[i2].timer_->heap_index_ = i2;
HITCBC 775   13187 } 775   15610 }
776   776  
777   // waiter_node out-of-class member function definitions 777   // waiter_node out-of-class member function definitions
778   778  
779   inline void 779   inline void
HITCBC 780   30 waiter_node::canceller::operator()() const 780   30 waiter_node::canceller::operator()() const
781   { 781   {
HITCBC 782   30 waiter_->svc_->cancel_waiter(waiter_); 782   30 waiter_->svc_->cancel_waiter(waiter_);
HITCBC 783   30 } 783   30 }
784   784  
785   inline void 785   inline void
MISUBC 786   waiter_node::completion_op::do_complete( 786   waiter_node::completion_op::do_complete(
787   [[maybe_unused]] void* owner, 787   [[maybe_unused]] void* owner,
788   scheduler_op* base, 788   scheduler_op* base,
789   std::uint32_t, 789   std::uint32_t,
790   std::uint32_t) 790   std::uint32_t)
791   { 791   {
792   // owner is always non-null here. The destroy path (owner == nullptr) 792   // owner is always non-null here. The destroy path (owner == nullptr)
793   // is unreachable because completion_op overrides destroy() directly, 793   // is unreachable because completion_op overrides destroy() directly,
794   // bypassing scheduler_op::destroy() which would call func_(nullptr, ...). 794   // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
MISUBC 795   BOOST_COROSIO_ASSERT(owner); 795   BOOST_COROSIO_ASSERT(owner);
MISUBC 796   static_cast<completion_op*>(base)->operator()(); 796   static_cast<completion_op*>(base)->operator()();
MISUBC 797   } 797   }
798   798  
799   inline void 799   inline void
HITCBC 800   6774 waiter_node::completion_op::operator()() 800   7985 waiter_node::completion_op::operator()()
801   { 801   {
HITCBC 802   6774 auto* w = waiter_; 802   7985 auto* w = waiter_;
HITCBC 803   6774 w->stop_cb_.reset(); 803   7985 w->stop_cb_.reset();
HITCBC 804   6774 if (w->ec_out_) 804   7985 if (w->ec_out_)
HITCBC 805   6774 *w->ec_out_ = w->ec_value_; 805   7985 *w->ec_out_ = w->ec_value_;
806   806  
HITCBC 807   6774 auto* cont = w->cont_; 807   7985 auto* cont = w->cont_;
HITCBC 808   6774 auto d = w->d_; 808   7985 auto d = w->d_;
HITCBC 809   6774 auto* svc = w->svc_; 809   7985 auto* svc = w->svc_;
HITCBC 810   6774 auto& sched = svc->get_scheduler(); 810   7985 auto& sched = svc->get_scheduler();
811   811  
HITCBC 812   6774 svc->destroy_waiter(w); 812   7985 svc->destroy_waiter(w);
813   813  
HITCBC 814   6774 d.post(*cont); 814   7985 d.post(*cont);
HITCBC 815   6774 sched.work_finished(); 815   7985 sched.work_finished();
HITCBC 816   6774 } 816   7985 }
817   817  
818   // GCC 14 false-positive: inlining ~optional<stop_callback> through 818   // GCC 14 false-positive: inlining ~optional<stop_callback> through
819   // delete loses track that stop_cb_ was already .reset() above. 819   // delete loses track that stop_cb_ was already .reset() above.
820   #if defined(__GNUC__) && !defined(__clang__) 820   #if defined(__GNUC__) && !defined(__clang__)
821   #pragma GCC diagnostic push 821   #pragma GCC diagnostic push
822   #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" 822   #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
823   #endif 823   #endif
824   inline void 824   inline void
HITCBC 825   8 waiter_node::completion_op::destroy() 825   8 waiter_node::completion_op::destroy()
826   { 826   {
827   // Called during scheduler shutdown drain when this completion_op is 827   // Called during scheduler shutdown drain when this completion_op is
828   // in the scheduler's ready queue (posted by cancel_timer() or 828   // in the scheduler's ready queue (posted by cancel_timer() or
829   // process_expired()). Balances the work_started() from 829   // process_expired()). Balances the work_started() from
830   // implementation::wait(). The scheduler drain loop separately 830   // implementation::wait(). The scheduler drain loop separately
831   // balances the work_started() from post(). On IOCP both decrements 831   // balances the work_started() from post(). On IOCP both decrements
832   // are required for outstanding_work_ to reach zero; on other 832   // are required for outstanding_work_ to reach zero; on other
833   // backends this is harmless. 833   // backends this is harmless.
834   // 834   //
835   // This override also prevents scheduler_op::destroy() from calling 835   // This override also prevents scheduler_op::destroy() from calling
836   // do_complete(nullptr, ...). See also: timer_service::shutdown() 836   // do_complete(nullptr, ...). See also: timer_service::shutdown()
837   // which drains waiters still in the timer heap (the other path). 837   // which drains waiters still in the timer heap (the other path).
HITCBC 838   8 auto* w = waiter_; 838   8 auto* w = waiter_;
HITCBC 839   8 w->stop_cb_.reset(); 839   8 w->stop_cb_.reset();
HITCBC 840   8 auto h = std::exchange(w->h_, {}); 840   8 auto h = std::exchange(w->h_, {});
HITCBC 841   8 auto& sched = w->svc_->get_scheduler(); 841   8 auto& sched = w->svc_->get_scheduler();
HITCBC 842   8 delete w; 842   8 delete w;
HITCBC 843   8 sched.work_finished(); 843   8 sched.work_finished();
HITCBC 844   8 if (h) 844   8 if (h)
HITCBC 845   8 h.destroy(); 845   8 h.destroy();
HITCBC 846   8 } 846   8 }
847   #if defined(__GNUC__) && !defined(__clang__) 847   #if defined(__GNUC__) && !defined(__clang__)
848   #pragma GCC diagnostic pop 848   #pragma GCC diagnostic pop
849   #endif 849   #endif
850   850  
851   inline std::coroutine_handle<> 851   inline std::coroutine_handle<>
HITCBC 852   6791 timer_service::implementation::wait( 852   8003 timer_service::implementation::wait(
853   std::coroutine_handle<> h, 853   std::coroutine_handle<> h,
854   capy::executor_ref d, 854   capy::executor_ref d,
855   std::stop_token token, 855   std::stop_token token,
856   std::error_code* ec, 856   std::error_code* ec,
857   capy::continuation* cont) 857   capy::continuation* cont)
858   { 858   {
859   // Already-expired fast path — no waiter_node, no mutex. 859   // Already-expired fast path — no waiter_node, no mutex.
860   // Post instead of dispatch so the coroutine yields to the 860   // Post instead of dispatch so the coroutine yields to the
861   // scheduler, allowing other queued work to run. 861   // scheduler, allowing other queued work to run.
HITCBC 862   6791 if (heap_index_ == (std::numeric_limits<std::size_t>::max)()) 862   8003 if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
863   { 863   {
HITCBC 864   6769 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now()) 864   7981 if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
865   { 865   {
HITCBC 866   1 if (ec) 866   2 if (ec)
HITCBC 867   1 *ec = {}; 867   2 *ec = {};
HITCBC 868   1 d.post(*cont); 868   2 d.post(*cont);
HITCBC 869   1 return std::noop_coroutine(); 869   2 return std::noop_coroutine();
870   } 870   }
871   } 871   }
872   872  
HITCBC 873   6790 auto* w = svc_->create_waiter(); 873   8001 auto* w = svc_->create_waiter();
HITCBC 874   6790 w->impl_ = this; 874   8001 w->impl_ = this;
HITCBC 875   6790 w->svc_ = svc_; 875   8001 w->svc_ = svc_;
HITCBC 876   6790 w->h_ = h; 876   8001 w->h_ = h;
HITCBC 877   6790 w->cont_ = cont; 877   8001 w->cont_ = cont;
HITCBC 878   6790 w->d_ = d; 878   8001 w->d_ = d;
HITCBC 879   6790 w->token_ = std::move(token); 879   8001 w->token_ = std::move(token);
HITCBC 880   6790 w->ec_out_ = ec; 880   8001 w->ec_out_ = ec;
881   881  
HITCBC 882   6790 svc_->insert_waiter(*this, w); 882   8001 svc_->insert_waiter(*this, w);
HITCBC 883   6790 might_have_pending_waits_ = true; 883   8001 might_have_pending_waits_ = true;
HITCBC 884   6790 svc_->get_scheduler().work_started(); 884   8001 svc_->get_scheduler().work_started();
885   885  
HITCBC 886   6790 if (w->token_.stop_possible()) 886   8001 if (w->token_.stop_possible())
HITCBC 887   48 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w}); 887   48 w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
888   888  
HITCBC 889   6790 return std::noop_coroutine(); 889   8001 return std::noop_coroutine();
890   } 890   }
891   891  
892   // Free functions 892   // Free functions
893   893  
894   struct timer_service_access 894   struct timer_service_access
895   { 895   {
HITCBC 896   6991 static timer_service& get_timer(io_context& ctx) noexcept 896   8203 static timer_service& get_timer(io_context& ctx) noexcept
897   { 897   {
HITCBC 898   6991 return *ctx.timer_svc_; 898   8203 return *ctx.timer_svc_;
899   } 899   }
900   900  
HITCBC 901   605 static void set_timer(io_context& ctx, timer_service& svc) noexcept 901   605 static void set_timer(io_context& ctx, timer_service& svc) noexcept
902   { 902   {
HITCBC 903   605 ctx.timer_svc_ = &svc; 903   605 ctx.timer_svc_ = &svc;
HITCBC 904   605 } 904   605 }
905   }; 905   };
906   906  
907   // Bypass find_service() mutex by reading io_context's cached pointer 907   // Bypass find_service() mutex by reading io_context's cached pointer
908   inline io_object::io_service& 908   inline io_object::io_service&
HITCBC 909   6991 timer_service_direct(capy::execution_context& ctx) noexcept 909   8203 timer_service_direct(capy::execution_context& ctx) noexcept
910   { 910   {
HITCBC 911   6991 return timer_service_access::get_timer(static_cast<io_context&>(ctx)); 911   8203 return timer_service_access::get_timer(static_cast<io_context&>(ctx));
912   } 912   }
913   913  
914   inline std::size_t 914   inline std::size_t
HITCBC 915   6 timer_service_update_expiry(timer::implementation& base) 915   6 timer_service_update_expiry(timer::implementation& base)
916   { 916   {
HITCBC 917   6 auto& impl = static_cast<timer_service::implementation&>(base); 917   6 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 918   6 return impl.svc_->update_timer(impl, impl.expiry_); 918   6 return impl.svc_->update_timer(impl, impl.expiry_);
919   } 919   }
920   920  
921   inline std::size_t 921   inline std::size_t
HITCBC 922   8 timer_service_cancel(timer::implementation& base) noexcept 922   8 timer_service_cancel(timer::implementation& base) noexcept
923   { 923   {
HITCBC 924   8 auto& impl = static_cast<timer_service::implementation&>(base); 924   8 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 925   8 return impl.svc_->cancel_timer(impl); 925   8 return impl.svc_->cancel_timer(impl);
926   } 926   }
927   927  
928   inline std::size_t 928   inline std::size_t
HITCBC 929   2 timer_service_cancel_one(timer::implementation& base) noexcept 929   2 timer_service_cancel_one(timer::implementation& base) noexcept
930   { 930   {
HITCBC 931   2 auto& impl = static_cast<timer_service::implementation&>(base); 931   2 auto& impl = static_cast<timer_service::implementation&>(base);
HITCBC 932   2 return impl.svc_->cancel_one_waiter(impl); 932   2 return impl.svc_->cancel_one_waiter(impl);
933   } 933   }
934   934  
935   inline timer_service& 935   inline timer_service&
HITCBC 936   605 get_timer_service(capy::execution_context& ctx, scheduler& sched) 936   605 get_timer_service(capy::execution_context& ctx, scheduler& sched)
937   { 937   {
HITCBC 938   605 auto& svc = ctx.make_service<timer_service>(sched); 938   605 auto& svc = ctx.make_service<timer_service>(sched);
HITCBC 939   605 timer_service_access::set_timer(static_cast<io_context&>(ctx), svc); 939   605 timer_service_access::set_timer(static_cast<io_context&>(ctx), svc);
HITCBC 940   605 return svc; 940   605 return svc;
941   } 941   }
942   942  
943   } // namespace boost::corosio::detail 943   } // namespace boost::corosio::detail
944   944  
945   #endif 945   #endif