86.13% Lines (118/137) 94.74% Functions (18/19)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2026 Michael Vandeberg 2   // Copyright (c) 2026 Michael Vandeberg
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/corosio 7   // Official repository: https://github.com/cppalliance/corosio
8   // 8   //
9   9  
10   #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP 10   #ifndef BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
11   #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP 11   #define BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP
12   12  
13   #include <boost/corosio/detail/platform.hpp> 13   #include <boost/corosio/detail/platform.hpp>
14   14  
15   #if BOOST_COROSIO_POSIX 15   #if BOOST_COROSIO_POSIX
16   16  
17   #include <boost/corosio/detail/config.hpp> 17   #include <boost/corosio/detail/config.hpp>
18   #include <boost/corosio/stream_file.hpp> 18   #include <boost/corosio/stream_file.hpp>
19   #include <boost/corosio/file_base.hpp> 19   #include <boost/corosio/file_base.hpp>
20   #include <boost/corosio/detail/intrusive.hpp> 20   #include <boost/corosio/detail/intrusive.hpp>
21   #include <boost/corosio/detail/dispatch_coro.hpp> 21   #include <boost/corosio/detail/dispatch_coro.hpp>
22   #include <boost/corosio/detail/scheduler_op.hpp> 22   #include <boost/corosio/detail/scheduler_op.hpp>
23   #include <boost/corosio/detail/continuation_op.hpp> 23   #include <boost/corosio/detail/continuation_op.hpp>
24   #include <boost/corosio/detail/thread_pool.hpp> 24   #include <boost/corosio/detail/thread_pool.hpp>
25   #include <boost/corosio/detail/scheduler.hpp> 25   #include <boost/corosio/detail/scheduler.hpp>
26   #include <boost/corosio/detail/buffer_param.hpp> 26   #include <boost/corosio/detail/buffer_param.hpp>
27   #include <boost/corosio/native/detail/make_err.hpp> 27   #include <boost/corosio/native/detail/make_err.hpp>
28   #include <boost/capy/ex/executor_ref.hpp> 28   #include <boost/capy/ex/executor_ref.hpp>
29   #include <boost/capy/error.hpp> 29   #include <boost/capy/error.hpp>
30   #include <boost/capy/buffers.hpp> 30   #include <boost/capy/buffers.hpp>
31   31  
32   #include <atomic> 32   #include <atomic>
33   #include <coroutine> 33   #include <coroutine>
34   #include <cstddef> 34   #include <cstddef>
35   #include <cstdint> 35   #include <cstdint>
36   #include <filesystem> 36   #include <filesystem>
37   #include <limits> 37   #include <limits>
38   #include <memory> 38   #include <memory>
39   #include <optional> 39   #include <optional>
40   #include <stop_token> 40   #include <stop_token>
41   #include <system_error> 41   #include <system_error>
42   42  
43   #include <errno.h> 43   #include <errno.h>
44   #include <fcntl.h> 44   #include <fcntl.h>
45   #include <sys/stat.h> 45   #include <sys/stat.h>
46   #include <sys/uio.h> 46   #include <sys/uio.h>
47   #include <unistd.h> 47   #include <unistd.h>
48   48  
49   /* 49   /*
50   POSIX Stream File Implementation 50   POSIX Stream File Implementation
51   ================================= 51   =================================
52   52  
53   Regular files cannot be monitored by epoll/kqueue/select — the kernel 53   Regular files cannot be monitored by epoll/kqueue/select — the kernel
54   always reports them as ready. Blocking I/O (pread/pwrite) is dispatched 54   always reports them as ready. Blocking I/O (pread/pwrite) is dispatched
55   to a shared thread pool, with completion posted back to the scheduler. 55   to a shared thread pool, with completion posted back to the scheduler.
56   56  
57   This follows the same pattern as posix_resolver: pool_work_item for 57   This follows the same pattern as posix_resolver: pool_work_item for
58   dispatch, scheduler_op for completion, shared_from_this for lifetime. 58   dispatch, scheduler_op for completion, shared_from_this for lifetime.
59   59  
60   Completion Flow 60   Completion Flow
61   --------------- 61   ---------------
62   1. read_some() sets up file_read_op, posts to thread pool 62   1. read_some() sets up file_read_op, posts to thread pool
63   2. Pool thread runs preadv() (blocking) 63   2. Pool thread runs preadv() (blocking)
64   3. Pool thread stores results, posts scheduler_op to scheduler 64   3. Pool thread stores results, posts scheduler_op to scheduler
65   4. Scheduler invokes op() which resumes the coroutine 65   4. Scheduler invokes op() which resumes the coroutine
66   66  
67   Single-Inflight Constraint 67   Single-Inflight Constraint
68   -------------------------- 68   --------------------------
69   Only one asynchronous operation may be in flight at a time on a 69   Only one asynchronous operation may be in flight at a time on a
70   given file object. Concurrent read and write is not supported 70   given file object. Concurrent read and write is not supported
71   because both share offset_ without synchronization. 71   because both share offset_ without synchronization.
72   */ 72   */
73   73  
74   namespace boost::corosio::detail { 74   namespace boost::corosio::detail {
75   75  
76   struct scheduler; 76   struct scheduler;
77   class posix_stream_file_service; 77   class posix_stream_file_service;
78   78  
79   /** Stream file implementation for POSIX backends. 79   /** Stream file implementation for POSIX backends.
80   80  
81   Each instance contains embedded operation objects (read_op_, write_op_) 81   Each instance contains embedded operation objects (read_op_, write_op_)
82   that are reused across calls. This avoids per-operation heap allocation. 82   that are reused across calls. This avoids per-operation heap allocation.
83   */ 83   */
84   class posix_stream_file final 84   class posix_stream_file final
85   : public stream_file::implementation 85   : public stream_file::implementation
86   , public std::enable_shared_from_this<posix_stream_file> 86   , public std::enable_shared_from_this<posix_stream_file>
87   , public intrusive_list<posix_stream_file>::node 87   , public intrusive_list<posix_stream_file>::node
88   { 88   {
89   friend class posix_stream_file_service; 89   friend class posix_stream_file_service;
90   90  
91   public: 91   public:
92   static constexpr std::size_t max_buffers = 16; 92   static constexpr std::size_t max_buffers = 16;
93   93  
94   /** Operation state for a single file read or write. */ 94   /** Operation state for a single file read or write. */
95   struct file_op : scheduler_op 95   struct file_op : scheduler_op
96   { 96   {
97   struct canceller 97   struct canceller
98   { 98   {
99   file_op* op; 99   file_op* op;
HITCBC 100   1 void operator()() const noexcept 100   1 void operator()() const noexcept
101   { 101   {
HITCBC 102   1 op->request_cancel(); 102   1 op->request_cancel();
HITCBC 103   1 } 103   1 }
104   }; 104   };
105   105  
106   // Coroutine state 106   // Coroutine state
107   std::coroutine_handle<> h; 107   std::coroutine_handle<> h;
108   detail::continuation_op cont_op; 108   detail::continuation_op cont_op;
109   capy::executor_ref ex; 109   capy::executor_ref ex;
110   110  
111   // Output pointers 111   // Output pointers
112   std::error_code* ec_out = nullptr; 112   std::error_code* ec_out = nullptr;
113   std::size_t* bytes_out = nullptr; 113   std::size_t* bytes_out = nullptr;
114   114  
115   // Buffer data (copied from buffer_param at submission time) 115   // Buffer data (copied from buffer_param at submission time)
116   iovec iovecs[max_buffers]; 116   iovec iovecs[max_buffers];
117   int iovec_count = 0; 117   int iovec_count = 0;
118   118  
119   // Result storage (populated by worker thread) 119   // Result storage (populated by worker thread)
120   int errn = 0; 120   int errn = 0;
121   std::size_t bytes_transferred = 0; 121   std::size_t bytes_transferred = 0;
122   bool is_read = false; 122   bool is_read = false;
123   123  
124   // Thread coordination 124   // Thread coordination
125   std::atomic<bool> cancelled{false}; 125   std::atomic<bool> cancelled{false};
126   std::optional<std::stop_callback<canceller>> stop_cb; 126   std::optional<std::stop_callback<canceller>> stop_cb;
127   127  
128   /// Prevents use-after-free when file is closed with pending ops. 128   /// Prevents use-after-free when file is closed with pending ops.
129   std::shared_ptr<void> impl_ref; 129   std::shared_ptr<void> impl_ref;
130   130  
HITCBC 131   52 file_op() = default; 131   52 file_op() = default;
132   132  
HITCBC 133   12 void reset() noexcept 133   12 void reset() noexcept
134   { 134   {
HITCBC 135   12 iovec_count = 0; 135   12 iovec_count = 0;
HITCBC 136   12 errn = 0; 136   12 errn = 0;
HITCBC 137   12 bytes_transferred = 0; 137   12 bytes_transferred = 0;
HITCBC 138   12 is_read = false; 138   12 is_read = false;
HITCBC 139   12 cancelled.store(false, std::memory_order_relaxed); 139   12 cancelled.store(false, std::memory_order_relaxed);
HITCBC 140   12 stop_cb.reset(); 140   12 stop_cb.reset();
HITCBC 141   12 impl_ref.reset(); 141   12 impl_ref.reset();
HITCBC 142   12 ec_out = nullptr; 142   12 ec_out = nullptr;
HITCBC 143   12 bytes_out = nullptr; 143   12 bytes_out = nullptr;
HITCBC 144   12 } 144   12 }
145   145  
146   void operator()() override; 146   void operator()() override;
147   void destroy() override; 147   void destroy() override;
148   148  
HITCBC 149   141 void request_cancel() noexcept 149   141 void request_cancel() noexcept
150   { 150   {
HITCBC 151   141 cancelled.store(true, std::memory_order_release); 151   141 cancelled.store(true, std::memory_order_release);
HITCBC 152   141 } 152   141 }
153   153  
HITCBC 154   12 void start(std::stop_token const& token) 154   12 void start(std::stop_token const& token)
155   { 155   {
HITCBC 156   12 cancelled.store(false, std::memory_order_release); 156   12 cancelled.store(false, std::memory_order_release);
HITCBC 157   12 stop_cb.reset(); 157   12 stop_cb.reset();
HITCBC 158   12 if (token.stop_possible()) 158   12 if (token.stop_possible())
HITCBC 159   1 stop_cb.emplace(token, canceller{this}); 159   1 stop_cb.emplace(token, canceller{this});
HITCBC 160   12 } 160   12 }
161   }; 161   };
162   162  
163   /** Pool work item for thread pool dispatch. */ 163   /** Pool work item for thread pool dispatch. */
164   struct pool_op : pool_work_item 164   struct pool_op : pool_work_item
165   { 165   {
166   posix_stream_file* file_ = nullptr; 166   posix_stream_file* file_ = nullptr;
167   std::shared_ptr<posix_stream_file> ref_; 167   std::shared_ptr<posix_stream_file> ref_;
168   }; 168   };
169   169  
170   explicit posix_stream_file(posix_stream_file_service& svc) noexcept; 170   explicit posix_stream_file(posix_stream_file_service& svc) noexcept;
171   171  
172   // -- io_stream::implementation -- 172   // -- io_stream::implementation --
173   173  
174   std::coroutine_handle<> read_some( 174   std::coroutine_handle<> read_some(
175   std::coroutine_handle<>, 175   std::coroutine_handle<>,
176   capy::executor_ref, 176   capy::executor_ref,
177   buffer_param, 177   buffer_param,
178   std::stop_token, 178   std::stop_token,
179   std::error_code*, 179   std::error_code*,
180   std::size_t*) override; 180   std::size_t*) override;
181   181  
182   std::coroutine_handle<> write_some( 182   std::coroutine_handle<> write_some(
183   std::coroutine_handle<>, 183   std::coroutine_handle<>,
184   capy::executor_ref, 184   capy::executor_ref,
185   buffer_param, 185   buffer_param,
186   std::stop_token, 186   std::stop_token,
187   std::error_code*, 187   std::error_code*,
188   std::size_t*) override; 188   std::size_t*) override;
189   189  
190   // -- stream_file::implementation -- 190   // -- stream_file::implementation --
191   191  
HITCBC 192   81 native_handle_type native_handle() const noexcept override 192   81 native_handle_type native_handle() const noexcept override
193   { 193   {
HITCBC 194   81 return fd_; 194   81 return fd_;
195   } 195   }
196   196  
HITCBC 197   70 void cancel() noexcept override 197   70 void cancel() noexcept override
198   { 198   {
HITCBC 199   70 read_op_.request_cancel(); 199   70 read_op_.request_cancel();
HITCBC 200   70 write_op_.request_cancel(); 200   70 write_op_.request_cancel();
HITCBC 201   70 } 201   70 }
202   202  
203   std::uint64_t size() const override; 203   std::uint64_t size() const override;
204   void resize(std::uint64_t new_size) override; 204   void resize(std::uint64_t new_size) override;
205   void sync_data() override; 205   void sync_data() override;
206   void sync_all() override; 206   void sync_all() override;
207   native_handle_type release() override; 207   native_handle_type release() override;
208   void assign(native_handle_type handle) override; 208   void assign(native_handle_type handle) override;
209   std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override; 209   std::uint64_t seek(std::int64_t offset, file_base::seek_basis origin) override;
210   210  
211   // -- Internal -- 211   // -- Internal --
212   212  
213   /** Open the file and store the fd. */ 213   /** Open the file and store the fd. */
214   std::error_code open_file( 214   std::error_code open_file(
215   std::filesystem::path const& path, file_base::flags mode); 215   std::filesystem::path const& path, file_base::flags mode);
216   216  
217   /** Close the file descriptor. */ 217   /** Close the file descriptor. */
218   void close_file() noexcept; 218   void close_file() noexcept;
219   219  
220   private: 220   private:
221   posix_stream_file_service& svc_; 221   posix_stream_file_service& svc_;
222   int fd_ = -1; 222   int fd_ = -1;
223   std::uint64_t offset_ = 0; 223   std::uint64_t offset_ = 0;
224   224  
225   file_op read_op_; 225   file_op read_op_;
226   file_op write_op_; 226   file_op write_op_;
227   pool_op read_pool_op_; 227   pool_op read_pool_op_;
228   pool_op write_pool_op_; 228   pool_op write_pool_op_;
229   229  
230   static void do_read_work(pool_work_item*) noexcept; 230   static void do_read_work(pool_work_item*) noexcept;
231   static void do_write_work(pool_work_item*) noexcept; 231   static void do_write_work(pool_work_item*) noexcept;
232   }; 232   };
233   233  
234   // --------------------------------------------------------------------------- 234   // ---------------------------------------------------------------------------
235   // Inline implementation 235   // Inline implementation
236   // --------------------------------------------------------------------------- 236   // ---------------------------------------------------------------------------
237   237  
238   inline 238   inline
HITCBC 239   26 posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept 239   26 posix_stream_file::posix_stream_file(posix_stream_file_service& svc) noexcept
HITCBC 240   26 : svc_(svc) 240   26 : svc_(svc)
241   { 241   {
HITCBC 242   26 } 242   26 }
243   243  
244   inline std::error_code 244   inline std::error_code
HITCBC 245   19 posix_stream_file::open_file( 245   19 posix_stream_file::open_file(
246   std::filesystem::path const& path, file_base::flags mode) 246   std::filesystem::path const& path, file_base::flags mode)
247   { 247   {
HITCBC 248   19 close_file(); 248   19 close_file();
249   249  
HITCBC 250   19 int oflags = 0; 250   19 int oflags = 0;
251   251  
252   // Access mode 252   // Access mode
HITCBC 253   19 unsigned access = static_cast<unsigned>(mode) & 3u; 253   19 unsigned access = static_cast<unsigned>(mode) & 3u;
HITCBC 254   19 if (access == static_cast<unsigned>(file_base::read_write)) 254   19 if (access == static_cast<unsigned>(file_base::read_write))
HITCBC 255   2 oflags |= O_RDWR; 255   2 oflags |= O_RDWR;
HITCBC 256   17 else if (access == static_cast<unsigned>(file_base::write_only)) 256   17 else if (access == static_cast<unsigned>(file_base::write_only))
HITCBC 257   7 oflags |= O_WRONLY; 257   7 oflags |= O_WRONLY;
258   else 258   else
HITCBC 259   10 oflags |= O_RDONLY; 259   10 oflags |= O_RDONLY;
260   260  
261   // Creation flags 261   // Creation flags
HITCBC 262   19 if ((mode & file_base::create) != file_base::flags(0)) 262   19 if ((mode & file_base::create) != file_base::flags(0))
HITCBC 263   6 oflags |= O_CREAT; 263   6 oflags |= O_CREAT;
HITCBC 264   19 if ((mode & file_base::exclusive) != file_base::flags(0)) 264   19 if ((mode & file_base::exclusive) != file_base::flags(0))
HITCBC 265   1 oflags |= O_EXCL; 265   1 oflags |= O_EXCL;
HITCBC 266   19 if ((mode & file_base::truncate) != file_base::flags(0)) 266   19 if ((mode & file_base::truncate) != file_base::flags(0))
HITCBC 267   5 oflags |= O_TRUNC; 267   5 oflags |= O_TRUNC;
HITCBC 268   19 if ((mode & file_base::append) != file_base::flags(0)) 268   19 if ((mode & file_base::append) != file_base::flags(0))
HITCBC 269   1 oflags |= O_APPEND; 269   1 oflags |= O_APPEND;
HITCBC 270   19 if ((mode & file_base::sync_all_on_write) != file_base::flags(0)) 270   19 if ((mode & file_base::sync_all_on_write) != file_base::flags(0))
MISUBC 271   oflags |= O_SYNC; 271   oflags |= O_SYNC;
272   272  
HITCBC 273   19 int fd = ::open(path.c_str(), oflags, 0666); 273   19 int fd = ::open(path.c_str(), oflags, 0666);
HITCBC 274   19 if (fd < 0) 274   19 if (fd < 0)
HITCBC 275   2 return make_err(errno); 275   2 return make_err(errno);
276   276  
HITCBC 277   17 fd_ = fd; 277   17 fd_ = fd;
HITCBC 278   17 offset_ = 0; 278   17 offset_ = 0;
279   279  
280   // Append mode: position at end-of-file (preadv/pwritev use 280   // Append mode: position at end-of-file (preadv/pwritev use
281   // explicit offsets, so O_APPEND alone is not sufficient). 281   // explicit offsets, so O_APPEND alone is not sufficient).
HITCBC 282   17 if ((mode & file_base::append) != file_base::flags(0)) 282   17 if ((mode & file_base::append) != file_base::flags(0))
283   { 283   {
284   struct stat st; 284   struct stat st;
HITCBC 285   1 if (::fstat(fd, &st) < 0) 285   1 if (::fstat(fd, &st) < 0)
286   { 286   {
MISUBC 287   int err = errno; 287   int err = errno;
MISUBC 288   ::close(fd); 288   ::close(fd);
MISUBC 289   fd_ = -1; 289   fd_ = -1;
MISUBC 290   return make_err(err); 290   return make_err(err);
291   } 291   }
HITCBC 292   1 offset_ = static_cast<std::uint64_t>(st.st_size); 292   1 offset_ = static_cast<std::uint64_t>(st.st_size);
293   } 293   }
294   294  
295   #ifdef POSIX_FADV_SEQUENTIAL 295   #ifdef POSIX_FADV_SEQUENTIAL
HITCBC 296   17 ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL); 296   17 ::posix_fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
297   #endif 297   #endif
298   298  
HITCBC 299   17 return {}; 299   17 return {};
300   } 300   }
301   301  
302   inline void 302   inline void
HITCBC 303   89 posix_stream_file::close_file() noexcept 303   89 posix_stream_file::close_file() noexcept
304   { 304   {
HITCBC 305   89 if (fd_ >= 0) 305   89 if (fd_ >= 0)
306   { 306   {
HITCBC 307   17 ::close(fd_); 307   17 ::close(fd_);
HITCBC 308   17 fd_ = -1; 308   17 fd_ = -1;
309   } 309   }
HITCBC 310   89 } 310   89 }
311   311  
312   inline std::uint64_t 312   inline std::uint64_t
HITCBC 313   3 posix_stream_file::size() const 313   3 posix_stream_file::size() const
314   { 314   {
315   struct stat st; 315   struct stat st;
HITCBC 316   3 if (::fstat(fd_, &st) < 0) 316   3 if (::fstat(fd_, &st) < 0)
MISUBC 317   throw_system_error(make_err(errno), "stream_file::size"); 317   throw_system_error(make_err(errno), "stream_file::size");
HITCBC 318   3 return static_cast<std::uint64_t>(st.st_size); 318   3 return static_cast<std::uint64_t>(st.st_size);
319   } 319   }
320   320  
321   inline void 321   inline void
HITCBC 322   1 posix_stream_file::resize(std::uint64_t new_size) 322   1 posix_stream_file::resize(std::uint64_t new_size)
323   { 323   {
HITCBC 324   1 if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max())) 324   1 if (new_size > static_cast<std::uint64_t>(std::numeric_limits<off_t>::max()))
MISUBC 325   throw_system_error(make_err(EOVERFLOW), "stream_file::resize"); 325   throw_system_error(make_err(EOVERFLOW), "stream_file::resize");
HITCBC 326   1 if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0) 326   1 if (::ftruncate(fd_, static_cast<off_t>(new_size)) < 0)
MISUBC 327   throw_system_error(make_err(errno), "stream_file::resize"); 327   throw_system_error(make_err(errno), "stream_file::resize");
HITCBC 328   1 } 328   1 }
329   329  
330   inline void 330   inline void
HITCBC 331   1 posix_stream_file::sync_data() 331   1 posix_stream_file::sync_data()
332   { 332   {
333   #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO 333   #if BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
HITCBC 334   1 if (::fdatasync(fd_) < 0) 334   1 if (::fdatasync(fd_) < 0)
335   #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO 335   #else // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
336   if (::fsync(fd_) < 0) 336   if (::fsync(fd_) < 0)
337   #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO 337   #endif // BOOST_COROSIO_HAS_POSIX_SYNCHRONIZED_IO
MISUBC 338   throw_system_error(make_err(errno), "stream_file::sync_data"); 338   throw_system_error(make_err(errno), "stream_file::sync_data");
HITCBC 339   1 } 339   1 }
340   340  
341   inline void 341   inline void
HITCBC 342   1 posix_stream_file::sync_all() 342   1 posix_stream_file::sync_all()
343   { 343   {
HITCBC 344   1 if (::fsync(fd_) < 0) 344   1 if (::fsync(fd_) < 0)
MISUBC 345   throw_system_error(make_err(errno), "stream_file::sync_all"); 345   throw_system_error(make_err(errno), "stream_file::sync_all");
HITCBC 346   1 } 346   1 }
347   347  
348   inline native_handle_type 348   inline native_handle_type
HITCBC 349   1 posix_stream_file::release() 349   1 posix_stream_file::release()
350   { 350   {
HITCBC 351   1 int fd = fd_; 351   1 int fd = fd_;
HITCBC 352   1 fd_ = -1; 352   1 fd_ = -1;
HITCBC 353   1 offset_ = 0; 353   1 offset_ = 0;
HITCBC 354   1 return fd; 354   1 return fd;
355   } 355   }
356   356  
357   inline void 357   inline void
HITCBC 358   1 posix_stream_file::assign(native_handle_type handle) 358   1 posix_stream_file::assign(native_handle_type handle)
359   { 359   {
HITCBC 360   1 close_file(); 360   1 close_file();
HITCBC 361   1 fd_ = handle; 361   1 fd_ = handle;
HITCBC 362   1 offset_ = 0; 362   1 offset_ = 0;
HITCBC 363   1 } 363   1 }
364   364  
365   inline std::uint64_t 365   inline std::uint64_t
HITCBC 366   7 posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin) 366   7 posix_stream_file::seek(std::int64_t offset, file_base::seek_basis origin)
367   { 367   {
368   // We track offset_ ourselves (not the kernel fd offset) 368   // We track offset_ ourselves (not the kernel fd offset)
369   // because preadv/pwritev use explicit offsets. 369   // because preadv/pwritev use explicit offsets.
370   std::int64_t new_pos; 370   std::int64_t new_pos;
371   371  
HITCBC 372   7 if (origin == file_base::seek_set) 372   7 if (origin == file_base::seek_set)
373   { 373   {
HITCBC 374   3 new_pos = offset; 374   3 new_pos = offset;
375   } 375   }
HITCBC 376   4 else if (origin == file_base::seek_cur) 376   4 else if (origin == file_base::seek_cur)
377   { 377   {
HITCBC 378   2 new_pos = static_cast<std::int64_t>(offset_) + offset; 378   2 new_pos = static_cast<std::int64_t>(offset_) + offset;
379   } 379   }
380   else 380   else
381   { 381   {
382   struct stat st; 382   struct stat st;
HITCBC 383   2 if (::fstat(fd_, &st) < 0) 383   2 if (::fstat(fd_, &st) < 0)
MISUBC 384   throw_system_error(make_err(errno), "stream_file::seek"); 384   throw_system_error(make_err(errno), "stream_file::seek");
HITCBC 385   2 new_pos = st.st_size + offset; 385   2 new_pos = st.st_size + offset;
386   } 386   }
387   387  
HITCBC 388   7 if (new_pos < 0) 388   7 if (new_pos < 0)
HITCBC 389   3 throw_system_error(make_err(EINVAL), "stream_file::seek"); 389   3 throw_system_error(make_err(EINVAL), "stream_file::seek");
HITCBC 390   4 if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max())) 390   4 if (new_pos > static_cast<std::int64_t>(std::numeric_limits<off_t>::max()))
MISUBC 391   throw_system_error(make_err(EOVERFLOW), "stream_file::seek"); 391   throw_system_error(make_err(EOVERFLOW), "stream_file::seek");
392   392  
HITCBC 393   4 offset_ = static_cast<std::uint64_t>(new_pos); 393   4 offset_ = static_cast<std::uint64_t>(new_pos);
394   394  
HITCBC 395   4 return offset_; 395   4 return offset_;
396   } 396   }
397   397  
398   // -- file_op completion handler -- 398   // -- file_op completion handler --
399   // (read_some, write_some, do_read_work, do_write_work are 399   // (read_some, write_some, do_read_work, do_write_work are
400   // defined in posix_stream_file_service.hpp after the service) 400   // defined in posix_stream_file_service.hpp after the service)
401   401  
402   inline void 402   inline void
HITCBC 403   12 posix_stream_file::file_op::operator()() 403   12 posix_stream_file::file_op::operator()()
404   { 404   {
HITCBC 405   12 stop_cb.reset(); 405   12 stop_cb.reset();
406   406  
HITCBC 407   12 bool const was_cancelled = cancelled.load(std::memory_order_acquire); 407   12 bool const was_cancelled = cancelled.load(std::memory_order_acquire);
408   408  
HITCBC 409   12 if (ec_out) 409   12 if (ec_out)
410   { 410   {
HITCBC 411   12 if (was_cancelled) 411   12 if (was_cancelled)
HITCBC 412   1 *ec_out = capy::error::canceled; 412   1 *ec_out = capy::error::canceled;
HITCBC 413   11 else if (errn != 0) 413   11 else if (errn != 0)
MISUBC 414   *ec_out = make_err(errn); 414   *ec_out = make_err(errn);
HITCBC 415   11 else if (is_read && bytes_transferred == 0) 415   11 else if (is_read && bytes_transferred == 0)
HITCBC 416   1 *ec_out = capy::error::eof; 416   1 *ec_out = capy::error::eof;
417   else 417   else
HITCBC 418   10 *ec_out = {}; 418   10 *ec_out = {};
419   } 419   }
420   420  
HITCBC 421   12 if (bytes_out) 421   12 if (bytes_out)
HITCBC 422   12 *bytes_out = was_cancelled ? 0 : bytes_transferred; 422   12 *bytes_out = was_cancelled ? 0 : bytes_transferred;
423   423  
424   // Move impl_ref to a local so members remain valid through 424   // Move impl_ref to a local so members remain valid through
425   // dispatch — impl_ref may be the last shared_ptr keeping 425   // dispatch — impl_ref may be the last shared_ptr keeping
426   // the parent posix_stream_file (which embeds this file_op) alive. 426   // the parent posix_stream_file (which embeds this file_op) alive.
HITCBC 427   12 auto prevent_destroy = std::move(impl_ref); 427   12 auto prevent_destroy = std::move(impl_ref);
HITCBC 428   12 ex.on_work_finished(); 428   12 ex.on_work_finished();
HITCBC 429   12 cont_op.cont.h = h; 429   12 cont_op.cont.h = h;
HITCBC 430   12 dispatch_coro(ex, cont_op.cont).resume(); 430   12 dispatch_coro(ex, cont_op.cont).resume();
HITCBC 431   12 } 431   12 }
432   432  
433   inline void 433   inline void
MISUBC 434   posix_stream_file::file_op::destroy() 434   posix_stream_file::file_op::destroy()
435   { 435   {
MISUBC 436   stop_cb.reset(); 436   stop_cb.reset();
MISUBC 437   auto local_ex = ex; 437   auto local_ex = ex;
MISUBC 438   impl_ref.reset(); 438   impl_ref.reset();
MISUBC 439   local_ex.on_work_finished(); 439   local_ex.on_work_finished();
MISUBC 440   } 440   }
441   441  
442   } // namespace boost::corosio::detail 442   } // namespace boost::corosio::detail
443   443  
444   #endif // BOOST_COROSIO_POSIX 444   #endif // BOOST_COROSIO_POSIX
445   445  
446   #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP 446   #endif // BOOST_COROSIO_NATIVE_DETAIL_POSIX_POSIX_STREAM_FILE_HPP