82.67% Lines (124/150)
90.00% Functions (9/10)
| TLA | Baseline | Branch | ||||||
|---|---|---|---|---|---|---|---|---|
| Line | Hits | Code | Line | Hits | Code | |||
| 1 | // | 1 | // | |||||
| 2 | // Copyright (c) 2026 Steve Gerbino | 2 | // Copyright (c) 2026 Steve Gerbino | |||||
| 3 | // | 3 | // | |||||
| 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | 4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |||||
| 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | 5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |||||
| 6 | // | 6 | // | |||||
| 7 | // Official repository: https://github.com/cppalliance/corosio | 7 | // Official repository: https://github.com/cppalliance/corosio | |||||
| 8 | // | 8 | // | |||||
| 9 | 9 | |||||||
| 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | 10 | #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | |||||
| 11 | #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | 11 | #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | |||||
| 12 | 12 | |||||||
| 13 | #include <boost/corosio/detail/platform.hpp> | 13 | #include <boost/corosio/detail/platform.hpp> | |||||
| 14 | 14 | |||||||
| 15 | #if BOOST_COROSIO_HAS_EPOLL | 15 | #if BOOST_COROSIO_HAS_EPOLL | |||||
| 16 | 16 | |||||||
| 17 | #include <boost/corosio/detail/config.hpp> | 17 | #include <boost/corosio/detail/config.hpp> | |||||
| 18 | #include <boost/capy/ex/execution_context.hpp> | 18 | #include <boost/capy/ex/execution_context.hpp> | |||||
| 19 | 19 | |||||||
| 20 | #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp> | 20 | #include <boost/corosio/native/detail/reactor/reactor_scheduler.hpp> | |||||
| 21 | 21 | |||||||
| 22 | #include <boost/corosio/native/detail/epoll/epoll_traits.hpp> | 22 | #include <boost/corosio/native/detail/epoll/epoll_traits.hpp> | |||||
| 23 | #include <boost/corosio/detail/timer_service.hpp> | 23 | #include <boost/corosio/detail/timer_service.hpp> | |||||
| 24 | #include <boost/corosio/native/detail/make_err.hpp> | 24 | #include <boost/corosio/native/detail/make_err.hpp> | |||||
| 25 | #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> | 25 | #include <boost/corosio/native/detail/posix/posix_resolver_service.hpp> | |||||
| 26 | #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> | 26 | #include <boost/corosio/native/detail/posix/posix_signal_service.hpp> | |||||
| 27 | #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp> | 27 | #include <boost/corosio/native/detail/posix/posix_stream_file_service.hpp> | |||||
| 28 | #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp> | 28 | #include <boost/corosio/native/detail/posix/posix_random_access_file_service.hpp> | |||||
| 29 | 29 | |||||||
| 30 | #include <boost/corosio/detail/except.hpp> | 30 | #include <boost/corosio/detail/except.hpp> | |||||
| 31 | 31 | |||||||
| 32 | #include <atomic> | 32 | #include <atomic> | |||||
| 33 | #include <chrono> | 33 | #include <chrono> | |||||
| 34 | #include <cstdint> | 34 | #include <cstdint> | |||||
| 35 | #include <mutex> | 35 | #include <mutex> | |||||
| 36 | #include <vector> | 36 | #include <vector> | |||||
| 37 | 37 | |||||||
| 38 | #include <errno.h> | 38 | #include <errno.h> | |||||
| 39 | #include <sys/epoll.h> | 39 | #include <sys/epoll.h> | |||||
| 40 | #include <sys/eventfd.h> | 40 | #include <sys/eventfd.h> | |||||
| 41 | #include <sys/timerfd.h> | 41 | #include <sys/timerfd.h> | |||||
| 42 | #include <unistd.h> | 42 | #include <unistd.h> | |||||
| 43 | 43 | |||||||
| 44 | namespace boost::corosio::detail { | 44 | namespace boost::corosio::detail { | |||||
| 45 | 45 | |||||||
| 46 | /** Linux scheduler using epoll for I/O multiplexing. | 46 | /** Linux scheduler using epoll for I/O multiplexing. | |||||
| 47 | 47 | |||||||
| 48 | This scheduler implements the scheduler interface using Linux epoll | 48 | This scheduler implements the scheduler interface using Linux epoll | |||||
| 49 | for efficient I/O event notification. It uses a single reactor model | 49 | for efficient I/O event notification. It uses a single reactor model | |||||
| 50 | where one thread runs epoll_wait while other threads | 50 | where one thread runs epoll_wait while other threads | |||||
| 51 | wait on a condition variable for handler work. This design provides: | 51 | wait on a condition variable for handler work. This design provides: | |||||
| 52 | 52 | |||||||
| 53 | - Handler parallelism: N posted handlers can execute on N threads | 53 | - Handler parallelism: N posted handlers can execute on N threads | |||||
| 54 | - No thundering herd: condition_variable wakes exactly one thread | 54 | - No thundering herd: condition_variable wakes exactly one thread | |||||
| 55 | - IOCP parity: Behavior matches Windows I/O completion port semantics | 55 | - IOCP parity: Behavior matches Windows I/O completion port semantics | |||||
| 56 | 56 | |||||||
| 57 | When threads call run(), they first try to execute queued handlers. | 57 | When threads call run(), they first try to execute queued handlers. | |||||
| 58 | If the queue is empty and no reactor is running, one thread becomes | 58 | If the queue is empty and no reactor is running, one thread becomes | |||||
| 59 | the reactor and runs epoll_wait. Other threads wait on a condition | 59 | the reactor and runs epoll_wait. Other threads wait on a condition | |||||
| 60 | variable until handlers are available. | 60 | variable until handlers are available. | |||||
| 61 | 61 | |||||||
| 62 | @par Thread Safety | 62 | @par Thread Safety | |||||
| 63 | All public member functions are thread-safe. | 63 | All public member functions are thread-safe. | |||||
| 64 | */ | 64 | */ | |||||
| 65 | class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler | 65 | class BOOST_COROSIO_DECL epoll_scheduler final : public reactor_scheduler | |||||
| 66 | { | 66 | { | |||||
| 67 | public: | 67 | public: | |||||
| 68 | /** Construct the scheduler. | 68 | /** Construct the scheduler. | |||||
| 69 | 69 | |||||||
| 70 | Creates an epoll instance, eventfd for reactor interruption, | 70 | Creates an epoll instance, eventfd for reactor interruption, | |||||
| 71 | and timerfd for kernel-managed timer expiry. | 71 | and timerfd for kernel-managed timer expiry. | |||||
| 72 | 72 | |||||||
| 73 | @param ctx Reference to the owning execution_context. | 73 | @param ctx Reference to the owning execution_context. | |||||
| 74 | @param concurrency_hint Hint for expected thread count (unused). | 74 | @param concurrency_hint Hint for expected thread count (unused). | |||||
| 75 | */ | 75 | */ | |||||
| 76 | epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); | 76 | epoll_scheduler(capy::execution_context& ctx, int concurrency_hint = -1); | |||||
| 77 | 77 | |||||||
| 78 | /// Destroy the scheduler. | 78 | /// Destroy the scheduler. | |||||
| 79 | ~epoll_scheduler() override; | 79 | ~epoll_scheduler() override; | |||||
| 80 | 80 | |||||||
| 81 | epoll_scheduler(epoll_scheduler const&) = delete; | 81 | epoll_scheduler(epoll_scheduler const&) = delete; | |||||
| 82 | epoll_scheduler& operator=(epoll_scheduler const&) = delete; | 82 | epoll_scheduler& operator=(epoll_scheduler const&) = delete; | |||||
| 83 | 83 | |||||||
| 84 | /// Shut down the scheduler, draining pending operations. | 84 | /// Shut down the scheduler, draining pending operations. | |||||
| 85 | void shutdown() override; | 85 | void shutdown() override; | |||||
| 86 | 86 | |||||||
| 87 | /// Apply runtime configuration, resizing the event buffer. | 87 | /// Apply runtime configuration, resizing the event buffer. | |||||
| 88 | void configure_reactor( | 88 | void configure_reactor( | |||||
| 89 | unsigned max_events, | 89 | unsigned max_events, | |||||
| 90 | unsigned budget_init, | 90 | unsigned budget_init, | |||||
| 91 | unsigned budget_max, | 91 | unsigned budget_max, | |||||
| 92 | unsigned unassisted) override; | 92 | unsigned unassisted) override; | |||||
| 93 | 93 | |||||||
| 94 | /** Return the epoll file descriptor. | 94 | /** Return the epoll file descriptor. | |||||
| 95 | 95 | |||||||
| 96 | Used by socket services to register file descriptors | 96 | Used by socket services to register file descriptors | |||||
| 97 | for I/O event notification. | 97 | for I/O event notification. | |||||
| 98 | 98 | |||||||
| 99 | @return The epoll file descriptor. | 99 | @return The epoll file descriptor. | |||||
| 100 | */ | 100 | */ | |||||
| 101 | int epoll_fd() const noexcept | 101 | int epoll_fd() const noexcept | |||||
| 102 | { | 102 | { | |||||
| 103 | return epoll_fd_; | 103 | return epoll_fd_; | |||||
| 104 | } | 104 | } | |||||
| 105 | 105 | |||||||
| 106 | /** Register a descriptor for persistent monitoring. | 106 | /** Register a descriptor for persistent monitoring. | |||||
| 107 | 107 | |||||||
| 108 | The fd is registered once and stays registered until explicitly | 108 | The fd is registered once and stays registered until explicitly | |||||
| 109 | deregistered. Events are dispatched via reactor_descriptor_state which | 109 | deregistered. Events are dispatched via reactor_descriptor_state which | |||||
| 110 | tracks pending read/write/connect operations. | 110 | tracks pending read/write/connect operations. | |||||
| 111 | 111 | |||||||
| 112 | @param fd The file descriptor to register. | 112 | @param fd The file descriptor to register. | |||||
| 113 | @param desc Pointer to descriptor data (stored in epoll_event.data.ptr). | 113 | @param desc Pointer to descriptor data (stored in epoll_event.data.ptr). | |||||
| 114 | */ | 114 | */ | |||||
| 115 | void register_descriptor(int fd, reactor_descriptor_state* desc) const; | 115 | void register_descriptor(int fd, reactor_descriptor_state* desc) const; | |||||
| 116 | 116 | |||||||
| 117 | /** Deregister a persistently registered descriptor. | 117 | /** Deregister a persistently registered descriptor. | |||||
| 118 | 118 | |||||||
| 119 | @param fd The file descriptor to deregister. | 119 | @param fd The file descriptor to deregister. | |||||
| 120 | */ | 120 | */ | |||||
| 121 | void deregister_descriptor(int fd) const; | 121 | void deregister_descriptor(int fd) const; | |||||
| 122 | 122 | |||||||
| 123 | private: | 123 | private: | |||||
| 124 | void | 124 | void | |||||
| 125 | run_task(lock_type& lock, context_type* ctx, | 125 | run_task(lock_type& lock, context_type* ctx, | |||||
| 126 | long timeout_us) override; | 126 | long timeout_us) override; | |||||
| 127 | void interrupt_reactor() const override; | 127 | void interrupt_reactor() const override; | |||||
| 128 | void update_timerfd() const; | 128 | void update_timerfd() const; | |||||
| 129 | 129 | |||||||
| 130 | int epoll_fd_; | 130 | int epoll_fd_; | |||||
| 131 | int event_fd_; | 131 | int event_fd_; | |||||
| 132 | int timer_fd_; | 132 | int timer_fd_; | |||||
| 133 | 133 | |||||||
| 134 | // Edge-triggered eventfd state | 134 | // Edge-triggered eventfd state | |||||
| 135 | mutable std::atomic<bool> eventfd_armed_{false}; | 135 | mutable std::atomic<bool> eventfd_armed_{false}; | |||||
| 136 | 136 | |||||||
| 137 | // Set when the earliest timer changes; flushed before epoll_wait | 137 | // Set when the earliest timer changes; flushed before epoll_wait | |||||
| 138 | mutable std::atomic<bool> timerfd_stale_{false}; | 138 | mutable std::atomic<bool> timerfd_stale_{false}; | |||||
| 139 | 139 | |||||||
| 140 | // Event buffer sized from max_events_per_poll_ (set at construction, | 140 | // Event buffer sized from max_events_per_poll_ (set at construction, | |||||
| 141 | // resized by configure_reactor via io_context_options). | 141 | // resized by configure_reactor via io_context_options). | |||||
| 142 | std::vector<epoll_event> event_buffer_; | 142 | std::vector<epoll_event> event_buffer_; | |||||
| 143 | }; | 143 | }; | |||||
| 144 | 144 | |||||||
| HITCBC | 145 | 366 | inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) | 145 | 366 | inline epoll_scheduler::epoll_scheduler(capy::execution_context& ctx, int) | ||
| HITCBC | 146 | 366 | : epoll_fd_(-1) | 146 | 366 | : epoll_fd_(-1) | ||
| HITCBC | 147 | 366 | , event_fd_(-1) | 147 | 366 | , event_fd_(-1) | ||
| HITCBC | 148 | 366 | , timer_fd_(-1) | 148 | 366 | , timer_fd_(-1) | ||
| HITCBC | 149 | 732 | , event_buffer_(max_events_per_poll_) | 149 | 732 | , event_buffer_(max_events_per_poll_) | ||
| 150 | { | 150 | { | |||||
| HITCBC | 151 | 366 | epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); | 151 | 366 | epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC); | ||
| HITCBC | 152 | 366 | if (epoll_fd_ < 0) | 152 | 366 | if (epoll_fd_ < 0) | ||
| MISUBC | 153 | ✗ | detail::throw_system_error(make_err(errno), "epoll_create1"); | 153 | ✗ | detail::throw_system_error(make_err(errno), "epoll_create1"); | ||
| 154 | 154 | |||||||
| HITCBC | 155 | 366 | event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); | 155 | 366 | event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); | ||
| HITCBC | 156 | 366 | if (event_fd_ < 0) | 156 | 366 | if (event_fd_ < 0) | ||
| 157 | { | 157 | { | |||||
| MISUBC | 158 | ✗ | int errn = errno; | 158 | ✗ | int errn = errno; | ||
| MISUBC | 159 | ✗ | ::close(epoll_fd_); | 159 | ✗ | ::close(epoll_fd_); | ||
| MISUBC | 160 | ✗ | detail::throw_system_error(make_err(errn), "eventfd"); | 160 | ✗ | detail::throw_system_error(make_err(errn), "eventfd"); | ||
| 161 | } | 161 | } | |||||
| 162 | 162 | |||||||
| HITCBC | 163 | 366 | timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); | 163 | 366 | timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); | ||
| HITCBC | 164 | 366 | if (timer_fd_ < 0) | 164 | 366 | if (timer_fd_ < 0) | ||
| 165 | { | 165 | { | |||||
| MISUBC | 166 | ✗ | int errn = errno; | 166 | ✗ | int errn = errno; | ||
| MISUBC | 167 | ✗ | ::close(event_fd_); | 167 | ✗ | ::close(event_fd_); | ||
| MISUBC | 168 | ✗ | ::close(epoll_fd_); | 168 | ✗ | ::close(epoll_fd_); | ||
| MISUBC | 169 | ✗ | detail::throw_system_error(make_err(errn), "timerfd_create"); | 169 | ✗ | detail::throw_system_error(make_err(errn), "timerfd_create"); | ||
| 170 | } | 170 | } | |||||
| 171 | 171 | |||||||
| HITCBC | 172 | 366 | epoll_event ev{}; | 172 | 366 | epoll_event ev{}; | ||
| HITCBC | 173 | 366 | ev.events = EPOLLIN | EPOLLET; | 173 | 366 | ev.events = EPOLLIN | EPOLLET; | ||
| HITCBC | 174 | 366 | ev.data.ptr = nullptr; | 174 | 366 | ev.data.ptr = nullptr; | ||
| HITCBC | 175 | 366 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0) | 175 | 366 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0) | ||
| 176 | { | 176 | { | |||||
| MISUBC | 177 | ✗ | int errn = errno; | 177 | ✗ | int errn = errno; | ||
| MISUBC | 178 | ✗ | ::close(timer_fd_); | 178 | ✗ | ::close(timer_fd_); | ||
| MISUBC | 179 | ✗ | ::close(event_fd_); | 179 | ✗ | ::close(event_fd_); | ||
| MISUBC | 180 | ✗ | ::close(epoll_fd_); | 180 | ✗ | ::close(epoll_fd_); | ||
| MISUBC | 181 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl"); | 181 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl"); | ||
| 182 | } | 182 | } | |||||
| 183 | 183 | |||||||
| HITCBC | 184 | 366 | epoll_event timer_ev{}; | 184 | 366 | epoll_event timer_ev{}; | ||
| HITCBC | 185 | 366 | timer_ev.events = EPOLLIN | EPOLLERR; | 185 | 366 | timer_ev.events = EPOLLIN | EPOLLERR; | ||
| HITCBC | 186 | 366 | timer_ev.data.ptr = &timer_fd_; | 186 | 366 | timer_ev.data.ptr = &timer_fd_; | ||
| HITCBC | 187 | 366 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0) | 187 | 366 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0) | ||
| 188 | { | 188 | { | |||||
| MISUBC | 189 | ✗ | int errn = errno; | 189 | ✗ | int errn = errno; | ||
| MISUBC | 190 | ✗ | ::close(timer_fd_); | 190 | ✗ | ::close(timer_fd_); | ||
| MISUBC | 191 | ✗ | ::close(event_fd_); | 191 | ✗ | ::close(event_fd_); | ||
| MISUBC | 192 | ✗ | ::close(epoll_fd_); | 192 | ✗ | ::close(epoll_fd_); | ||
| MISUBC | 193 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)"); | 193 | ✗ | detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)"); | ||
| 194 | } | 194 | } | |||||
| 195 | 195 | |||||||
| HITCBC | 196 | 366 | timer_svc_ = &get_timer_service(ctx, *this); | 196 | 366 | timer_svc_ = &get_timer_service(ctx, *this); | ||
| HITCBC | 197 | 366 | timer_svc_->set_on_earliest_changed( | 197 | 366 | timer_svc_->set_on_earliest_changed( | ||
| HITCBC | 198 | 3737 | timer_service::callback(this, [](void* p) { | 198 | 4604 | timer_service::callback(this, [](void* p) { | ||
| HITCBC | 199 | 3371 | auto* self = static_cast<epoll_scheduler*>(p); | 199 | 4238 | auto* self = static_cast<epoll_scheduler*>(p); | ||
| HITCBC | 200 | 3371 | self->timerfd_stale_.store(true, std::memory_order_release); | 200 | 4238 | self->timerfd_stale_.store(true, std::memory_order_release); | ||
| HITCBC | 201 | 3371 | self->interrupt_reactor(); | 201 | 4238 | self->interrupt_reactor(); | ||
| HITCBC | 202 | 3371 | })); | 202 | 4238 | })); | ||
| 203 | 203 | |||||||
| HITCBC | 204 | 366 | get_resolver_service(ctx, *this); | 204 | 366 | get_resolver_service(ctx, *this); | ||
| HITCBC | 205 | 366 | get_signal_service(ctx, *this); | 205 | 366 | get_signal_service(ctx, *this); | ||
| HITCBC | 206 | 366 | get_stream_file_service(ctx, *this); | 206 | 366 | get_stream_file_service(ctx, *this); | ||
| HITCBC | 207 | 366 | get_random_access_file_service(ctx, *this); | 207 | 366 | get_random_access_file_service(ctx, *this); | ||
| 208 | 208 | |||||||
| HITCBC | 209 | 366 | completed_ops_.push(&task_op_); | 209 | 366 | completed_ops_.push(&task_op_); | ||
| HITCBC | 210 | 366 | } | 210 | 366 | } | ||
| 211 | 211 | |||||||
| HITCBC | 212 | 732 | inline epoll_scheduler::~epoll_scheduler() | 212 | 732 | inline epoll_scheduler::~epoll_scheduler() | ||
| 213 | { | 213 | { | |||||
| HITCBC | 214 | 366 | if (timer_fd_ >= 0) | 214 | 366 | if (timer_fd_ >= 0) | ||
| HITCBC | 215 | 366 | ::close(timer_fd_); | 215 | 366 | ::close(timer_fd_); | ||
| HITCBC | 216 | 366 | if (event_fd_ >= 0) | 216 | 366 | if (event_fd_ >= 0) | ||
| HITCBC | 217 | 366 | ::close(event_fd_); | 217 | 366 | ::close(event_fd_); | ||
| HITCBC | 218 | 366 | if (epoll_fd_ >= 0) | 218 | 366 | if (epoll_fd_ >= 0) | ||
| HITCBC | 219 | 366 | ::close(epoll_fd_); | 219 | 366 | ::close(epoll_fd_); | ||
| HITCBC | 220 | 732 | } | 220 | 732 | } | ||
| 221 | 221 | |||||||
| 222 | inline void | 222 | inline void | |||||
| HITCBC | 223 | 366 | epoll_scheduler::shutdown() | 223 | 366 | epoll_scheduler::shutdown() | ||
| 224 | { | 224 | { | |||||
| HITCBC | 225 | 366 | shutdown_drain(); | 225 | 366 | shutdown_drain(); | ||
| 226 | 226 | |||||||
| HITCBC | 227 | 366 | if (event_fd_ >= 0) | 227 | 366 | if (event_fd_ >= 0) | ||
| HITCBC | 228 | 366 | interrupt_reactor(); | 228 | 366 | interrupt_reactor(); | ||
| HITCBC | 229 | 366 | } | 229 | 366 | } | ||
| 230 | 230 | |||||||
| 231 | inline void | 231 | inline void | |||||
| MISUBC | 232 | ✗ | epoll_scheduler::configure_reactor( | 232 | ✗ | epoll_scheduler::configure_reactor( | ||
| 233 | unsigned max_events, | 233 | unsigned max_events, | |||||
| 234 | unsigned budget_init, | 234 | unsigned budget_init, | |||||
| 235 | unsigned budget_max, | 235 | unsigned budget_max, | |||||
| 236 | unsigned unassisted) | 236 | unsigned unassisted) | |||||
| 237 | { | 237 | { | |||||
| MISUBC | 238 | ✗ | reactor_scheduler::configure_reactor( | 238 | ✗ | reactor_scheduler::configure_reactor( | ||
| 239 | max_events, budget_init, budget_max, unassisted); | 239 | max_events, budget_init, budget_max, unassisted); | |||||
| MISUBC | 240 | ✗ | event_buffer_.resize(max_events_per_poll_); | 240 | ✗ | event_buffer_.resize(max_events_per_poll_); | ||
| MISUBC | 241 | ✗ | } | 241 | ✗ | } | ||
| 242 | 242 | |||||||
| 243 | inline void | 243 | inline void | |||||
| HITCBC | 244 | 6476 | epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const | 244 | 8212 | epoll_scheduler::register_descriptor(int fd, reactor_descriptor_state* desc) const | ||
| 245 | { | 245 | { | |||||
| HITCBC | 246 | 6476 | epoll_event ev{}; | 246 | 8212 | epoll_event ev{}; | ||
| HITCBC | 247 | 6476 | ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; | 247 | 8212 | ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP; | ||
| HITCBC | 248 | 6476 | ev.data.ptr = desc; | 248 | 8212 | ev.data.ptr = desc; | ||
| 249 | 249 | |||||||
| HITCBC | 250 | 6476 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) | 250 | 8212 | if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0) | ||
| MISUBC | 251 | ✗ | detail::throw_system_error(make_err(errno), "epoll_ctl (register)"); | 251 | ✗ | detail::throw_system_error(make_err(errno), "epoll_ctl (register)"); | ||
| 252 | 252 | |||||||
| HITCBC | 253 | 6476 | desc->registered_events = ev.events; | 253 | 8212 | desc->registered_events = ev.events; | ||
| HITCBC | 254 | 6476 | desc->fd = fd; | 254 | 8212 | desc->fd = fd; | ||
| HITCBC | 255 | 6476 | desc->scheduler_ = this; | 255 | 8212 | desc->scheduler_ = this; | ||
| HITCBC | 256 | 6476 | desc->mutex.set_enabled(!single_threaded_); | 256 | 8212 | desc->mutex.set_enabled(!single_threaded_); | ||
| HITCBC | 257 | 6476 | desc->ready_events_.store(0, std::memory_order_relaxed); | 257 | 8212 | desc->ready_events_.store(0, std::memory_order_relaxed); | ||
| 258 | 258 | |||||||
| HITCBC | 259 | 6476 | conditionally_enabled_mutex::scoped_lock lock(desc->mutex); | 259 | 8212 | conditionally_enabled_mutex::scoped_lock lock(desc->mutex); | ||
| HITCBC | 260 | 6476 | desc->impl_ref_.reset(); | 260 | 8212 | desc->impl_ref_.reset(); | ||
| HITCBC | 261 | 6476 | desc->read_ready = false; | 261 | 8212 | desc->read_ready = false; | ||
| HITCBC | 262 | 6476 | desc->write_ready = false; | 262 | 8212 | desc->write_ready = false; | ||
| HITCBC | 263 | 6476 | } | 263 | 8212 | } | ||
| 264 | 264 | |||||||
| 265 | inline void | 265 | inline void | |||||
| HITCBC | 266 | 6476 | epoll_scheduler::deregister_descriptor(int fd) const | 266 | 8212 | epoll_scheduler::deregister_descriptor(int fd) const | ||
| 267 | { | 267 | { | |||||
| HITCBC | 268 | 6476 | ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); | 268 | 8212 | ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr); | ||
| HITCBC | 269 | 6476 | } | 269 | 8212 | } | ||
| 270 | 270 | |||||||
| 271 | inline void | 271 | inline void | |||||
| HITCBC | 272 | 4049 | epoll_scheduler::interrupt_reactor() const | 272 | 4916 | epoll_scheduler::interrupt_reactor() const | ||
| 273 | { | 273 | { | |||||
| HITCBC | 274 | 4049 | bool expected = false; | 274 | 4916 | bool expected = false; | ||
| HITCBC | 275 | 4049 | if (eventfd_armed_.compare_exchange_strong( | 275 | 4916 | if (eventfd_armed_.compare_exchange_strong( | ||
| 276 | expected, true, std::memory_order_release, | 276 | expected, true, std::memory_order_release, | |||||
| 277 | std::memory_order_relaxed)) | 277 | std::memory_order_relaxed)) | |||||
| 278 | { | 278 | { | |||||
| HITCBC | 279 | 3815 | std::uint64_t val = 1; | 279 | 4686 | std::uint64_t val = 1; | ||
| HITCBC | 280 | 3815 | [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); | 280 | 4686 | [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val)); | ||
| 281 | } | 281 | } | |||||
| HITCBC | 282 | 4049 | } | 282 | 4916 | } | ||
| 283 | 283 | |||||||
| 284 | inline void | 284 | inline void | |||||
| HITCBC | 285 | 6701 | epoll_scheduler::update_timerfd() const | 285 | 8436 | epoll_scheduler::update_timerfd() const | ||
| 286 | { | 286 | { | |||||
| HITCBC | 287 | 6701 | auto nearest = timer_svc_->nearest_expiry(); | 287 | 8436 | auto nearest = timer_svc_->nearest_expiry(); | ||
| 288 | 288 | |||||||
| HITCBC | 289 | 6701 | itimerspec ts{}; | 289 | 8436 | itimerspec ts{}; | ||
| HITCBC | 290 | 6701 | int flags = 0; | 290 | 8436 | int flags = 0; | ||
| 291 | 291 | |||||||
| HITCBC | 292 | 6701 | if (nearest == timer_service::time_point::max()) | 292 | 8436 | if (nearest == timer_service::time_point::max()) | ||
| 293 | { | 293 | { | |||||
| 294 | // No timers — disarm by setting to 0 (relative) | 294 | // No timers — disarm by setting to 0 (relative) | |||||
| 295 | } | 295 | } | |||||
| 296 | else | 296 | else | |||||
| 297 | { | 297 | { | |||||
| HITCBC | 298 | 6645 | auto now = std::chrono::steady_clock::now(); | 298 | 8381 | auto now = std::chrono::steady_clock::now(); | ||
| HITCBC | 299 | 6645 | if (nearest <= now) | 299 | 8381 | if (nearest <= now) | ||
| 300 | { | 300 | { | |||||
| 301 | // Use 1ns instead of 0 — zero disarms the timerfd | 301 | // Use 1ns instead of 0 — zero disarms the timerfd | |||||
| HITCBC | 302 | 337 | ts.it_value.tv_nsec = 1; | 302 | 332 | ts.it_value.tv_nsec = 1; | ||
| 303 | } | 303 | } | |||||
| 304 | else | 304 | else | |||||
| 305 | { | 305 | { | |||||
| HITCBC | 306 | 6308 | auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>( | 306 | 8049 | auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>( | ||
| HITCBC | 307 | 6308 | nearest - now) | 307 | 8049 | nearest - now) | ||
| HITCBC | 308 | 6308 | .count(); | 308 | 8049 | .count(); | ||
| HITCBC | 309 | 6308 | ts.it_value.tv_sec = nsec / 1000000000; | 309 | 8049 | ts.it_value.tv_sec = nsec / 1000000000; | ||
| HITCBC | 310 | 6308 | ts.it_value.tv_nsec = nsec % 1000000000; | 310 | 8049 | ts.it_value.tv_nsec = nsec % 1000000000; | ||
| HITCBC | 311 | 6308 | if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0) | 311 | 8049 | if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0) | ||
| MISUBC | 312 | ✗ | ts.it_value.tv_nsec = 1; | 312 | ✗ | ts.it_value.tv_nsec = 1; | ||
| 313 | } | 313 | } | |||||
| 314 | } | 314 | } | |||||
| 315 | 315 | |||||||
| HITCBC | 316 | 6701 | if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0) | 316 | 8436 | if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0) | ||
| MISUBC | 317 | ✗ | detail::throw_system_error(make_err(errno), "timerfd_settime"); | 317 | ✗ | detail::throw_system_error(make_err(errno), "timerfd_settime"); | ||
| HITCBC | 318 | 6701 | } | 318 | 8436 | } | ||
| 319 | 319 | |||||||
| 320 | inline void | 320 | inline void | |||||
| HITCBC | 321 | 33900 | epoll_scheduler::run_task( | 321 | 37544 | epoll_scheduler::run_task( | ||
| 322 | lock_type& lock, context_type* ctx, long timeout_us) | 322 | lock_type& lock, context_type* ctx, long timeout_us) | |||||
| 323 | { | 323 | { | |||||
| 324 | int timeout_ms; | 324 | int timeout_ms; | |||||
| HITCBC | 325 | 33900 | if (task_interrupted_) | 325 | 37544 | if (task_interrupted_) | ||
| HITCBC | 326 | 24577 | timeout_ms = 0; | 326 | 25629 | timeout_ms = 0; | ||
| HITCBC | 327 | 9323 | else if (timeout_us < 0) | 327 | 11915 | else if (timeout_us < 0) | ||
| HITCBC | 328 | 9315 | timeout_ms = -1; | 328 | 11907 | timeout_ms = -1; | ||
| 329 | else | 329 | else | |||||
| HITCBC | 330 | 8 | timeout_ms = static_cast<int>((timeout_us + 999) / 1000); | 330 | 8 | timeout_ms = static_cast<int>((timeout_us + 999) / 1000); | ||
| 331 | 331 | |||||||
| HITCBC | 332 | 33900 | if (lock.owns_lock()) | 332 | 37544 | if (lock.owns_lock()) | ||
| HITCBC | 333 | 9323 | lock.unlock(); | 333 | 11915 | lock.unlock(); | ||
| 334 | 334 | |||||||
| HITCBC | 335 | 33900 | task_cleanup on_exit{this, &lock, ctx}; | 335 | 37544 | task_cleanup on_exit{this, &lock, ctx}; | ||
| 336 | 336 | |||||||
| 337 | // Flush deferred timerfd programming before blocking | 337 | // Flush deferred timerfd programming before blocking | |||||
| HITCBC | 338 | 33900 | if (timerfd_stale_.exchange(false, std::memory_order_acquire)) | 338 | 37544 | if (timerfd_stale_.exchange(false, std::memory_order_acquire)) | ||
| HITCBC | 339 | 3349 | update_timerfd(); | 339 | 4216 | update_timerfd(); | ||
| 340 | 340 | |||||||
| HITCBC | 341 | 33900 | int nfds = ::epoll_wait( | 341 | 37544 | int nfds = ::epoll_wait( | ||
| 342 | epoll_fd_, event_buffer_.data(), | 342 | epoll_fd_, event_buffer_.data(), | |||||
| HITCBC | 343 | 33900 | static_cast<int>(event_buffer_.size()), timeout_ms); | 343 | 37544 | static_cast<int>(event_buffer_.size()), timeout_ms); | ||
| 344 | 344 | |||||||
| HITCBC | 345 | 33900 | if (nfds < 0 && errno != EINTR) | 345 | 37544 | if (nfds < 0 && errno != EINTR) | ||
| MISUBC | 346 | ✗ | detail::throw_system_error(make_err(errno), "epoll_wait"); | 346 | ✗ | detail::throw_system_error(make_err(errno), "epoll_wait"); | ||
| 347 | 347 | |||||||
| HITCBC | 348 | 33900 | bool check_timers = false; | 348 | 37544 | bool check_timers = false; | ||
| HITCBC | 349 | 33900 | op_queue local_ops; | 349 | 37544 | op_queue local_ops; | ||
| 350 | 350 | |||||||
| HITCBC | 351 | 77605 | for (int i = 0; i < nfds; ++i) | 351 | 85145 | for (int i = 0; i < nfds; ++i) | ||
| 352 | { | 352 | { | |||||
| HITCBC | 353 | 43705 | if (event_buffer_[i].data.ptr == nullptr) | 353 | 47601 | if (event_buffer_[i].data.ptr == nullptr) | ||
| 354 | { | 354 | { | |||||
| 355 | std::uint64_t val; | 355 | std::uint64_t val; | |||||
| 356 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | 356 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | |||||
| HITCBC | 357 | 3449 | [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); | 357 | 4320 | [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val)); | ||
| HITCBC | 358 | 3449 | eventfd_armed_.store(false, std::memory_order_relaxed); | 358 | 4320 | eventfd_armed_.store(false, std::memory_order_relaxed); | ||
| HITCBC | 359 | 3449 | continue; | 359 | 4320 | continue; | ||
| HITCBC | 360 | 3449 | } | 360 | 4320 | } | ||
| 361 | 361 | |||||||
| HITCBC | 362 | 40256 | if (event_buffer_[i].data.ptr == &timer_fd_) | 362 | 43281 | if (event_buffer_[i].data.ptr == &timer_fd_) | ||
| 363 | { | 363 | { | |||||
| 364 | std::uint64_t expirations; | 364 | std::uint64_t expirations; | |||||
| 365 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | 365 | // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection) | |||||
| 366 | [[maybe_unused]] auto r = | 366 | [[maybe_unused]] auto r = | |||||
| HITCBC | 367 | 3352 | ::read(timer_fd_, &expirations, sizeof(expirations)); | 367 | 4220 | ::read(timer_fd_, &expirations, sizeof(expirations)); | ||
| HITCBC | 368 | 3352 | check_timers = true; | 368 | 4220 | check_timers = true; | ||
| HITCBC | 369 | 3352 | continue; | 369 | 4220 | continue; | ||
| HITCBC | 370 | 3352 | } | 370 | 4220 | } | ||
| 371 | 371 | |||||||
| 372 | auto* desc = | 372 | auto* desc = | |||||
| HITCBC | 373 | 36904 | static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr); | 373 | 39061 | static_cast<reactor_descriptor_state*>(event_buffer_[i].data.ptr); | ||
| HITCBC | 374 | 36904 | desc->add_ready_events(event_buffer_[i].events); | 374 | 39061 | desc->add_ready_events(event_buffer_[i].events); | ||
| 375 | 375 | |||||||
| HITCBC | 376 | 36904 | bool expected = false; | 376 | 39061 | bool expected = false; | ||
| HITCBC | 377 | 36904 | if (desc->is_enqueued_.compare_exchange_strong( | 377 | 39061 | if (desc->is_enqueued_.compare_exchange_strong( | ||
| 378 | expected, true, std::memory_order_release, | 378 | expected, true, std::memory_order_release, | |||||
| 379 | std::memory_order_relaxed)) | 379 | std::memory_order_relaxed)) | |||||
| 380 | { | 380 | { | |||||
| HITCBC | 381 | 36904 | local_ops.push(desc); | 381 | 39061 | local_ops.push(desc); | ||
| 382 | } | 382 | } | |||||
| 383 | } | 383 | } | |||||
| 384 | 384 | |||||||
| HITCBC | 385 | 33900 | if (check_timers) | 385 | 37544 | if (check_timers) | ||
| 386 | { | 386 | { | |||||
| HITCBC | 387 | 3352 | timer_svc_->process_expired(); | 387 | 4220 | timer_svc_->process_expired(); | ||
| HITCBC | 388 | 3352 | update_timerfd(); | 388 | 4220 | update_timerfd(); | ||
| 389 | } | 389 | } | |||||
| 390 | 390 | |||||||
| HITCBC | 391 | 33900 | lock.lock(); | 391 | 37544 | lock.lock(); | ||
| 392 | 392 | |||||||
| HITCBC | 393 | 33900 | if (!local_ops.empty()) | 393 | 37544 | if (!local_ops.empty()) | ||
| HITCBC | 394 | 24039 | completed_ops_.splice(local_ops); | 394 | 25091 | completed_ops_.splice(local_ops); | ||
| HITCBC | 395 | 33900 | } | 395 | 37544 | } | ||
| 396 | 396 | |||||||
| 397 | } // namespace boost::corosio::detail | 397 | } // namespace boost::corosio::detail | |||||
| 398 | 398 | |||||||
| 399 | #endif // BOOST_COROSIO_HAS_EPOLL | 399 | #endif // BOOST_COROSIO_HAS_EPOLL | |||||
| 400 | 400 | |||||||
| 401 | #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | 401 | #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SCHEDULER_HPP | |||||