98.53% Lines (134/136) 100.00% Functions (31/31)
TLA Baseline Branch
Line Hits Code Line Hits Code
1   // 1   //
2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com) 2   // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3   // 3   //
4   // Distributed under the Boost Software License, Version 1.0. (See accompanying 4   // Distributed under the Boost Software License, Version 1.0. (See accompanying
5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 5   // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6   // 6   //
7   // Official repository: https://github.com/cppalliance/capy 7   // Official repository: https://github.com/cppalliance/capy
8   // 8   //
9   9  
10   #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP 10   #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11   #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP 11   #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12   12  
13   #include <boost/capy/detail/config.hpp> 13   #include <boost/capy/detail/config.hpp>
14   #include <boost/capy/detail/await_suspend_helper.hpp> 14   #include <boost/capy/detail/await_suspend_helper.hpp>
15   #include <boost/capy/buffers.hpp> 15   #include <boost/capy/buffers.hpp>
16   #include <boost/capy/detail/buffer_array.hpp> 16   #include <boost/capy/detail/buffer_array.hpp>
17   #include <boost/capy/buffers/buffer_param.hpp> 17   #include <boost/capy/buffers/buffer_param.hpp>
18   #include <boost/capy/concept/io_awaitable.hpp> 18   #include <boost/capy/concept/io_awaitable.hpp>
19   #include <boost/capy/concept/read_source.hpp> 19   #include <boost/capy/concept/read_source.hpp>
20   #include <boost/capy/ex/io_env.hpp> 20   #include <boost/capy/ex/io_env.hpp>
21   #include <boost/capy/io_result.hpp> 21   #include <boost/capy/io_result.hpp>
22   #include <boost/capy/io_task.hpp> 22   #include <boost/capy/io_task.hpp>
23   23  
24   #include <concepts> 24   #include <concepts>
25   #include <coroutine> 25   #include <coroutine>
26   #include <cstddef> 26   #include <cstddef>
27   #include <exception> 27   #include <exception>
28   #include <new> 28   #include <new>
29   #include <span> 29   #include <span>
30   #include <stop_token> 30   #include <stop_token>
31   #include <system_error> 31   #include <system_error>
32   #include <utility> 32   #include <utility>
33   33  
34   namespace boost { 34   namespace boost {
35   namespace capy { 35   namespace capy {
36   36  
37   /** Type-erased wrapper for any ReadSource. 37   /** Type-erased wrapper for any ReadSource.
38   38  
39   This class provides type erasure for any type satisfying the 39   This class provides type erasure for any type satisfying the
40   @ref ReadSource concept, enabling runtime polymorphism for 40   @ref ReadSource concept, enabling runtime polymorphism for
41   source read operations. It uses cached awaitable storage to achieve 41   source read operations. It uses cached awaitable storage to achieve
42   zero steady-state allocation after construction. 42   zero steady-state allocation after construction.
43   43  
44   The wrapper supports two construction modes: 44   The wrapper supports two construction modes:
45   - **Owning**: Pass by value to transfer ownership. The wrapper 45   - **Owning**: Pass by value to transfer ownership. The wrapper
46   allocates storage and owns the source. 46   allocates storage and owns the source.
47   - **Reference**: Pass a pointer to wrap without ownership. The 47   - **Reference**: Pass a pointer to wrap without ownership. The
48   pointed-to source must outlive this wrapper. 48   pointed-to source must outlive this wrapper.
49   49  
50   @par Awaitable Preallocation 50   @par Awaitable Preallocation
51   The constructor preallocates storage for the type-erased awaitable. 51   The constructor preallocates storage for the type-erased awaitable.
52   This reserves all virtual address space at server startup 52   This reserves all virtual address space at server startup
53   so memory usage can be measured up front, rather than 53   so memory usage can be measured up front, rather than
54   allocating piecemeal as traffic arrives. 54   allocating piecemeal as traffic arrives.
55   55  
56   @par Immediate Completion 56   @par Immediate Completion
57   Operations complete immediately without suspending when the 57   Operations complete immediately without suspending when the
58   buffer sequence is empty, or when the underlying source's 58   buffer sequence is empty, or when the underlying source's
59   awaitable reports readiness via `await_ready`. 59   awaitable reports readiness via `await_ready`.
60   60  
61   @par Thread Safety 61   @par Thread Safety
62   Not thread-safe. Concurrent operations on the same wrapper 62   Not thread-safe. Concurrent operations on the same wrapper
63   are undefined behavior. 63   are undefined behavior.
64   64  
65   @par Example 65   @par Example
66   @code 66   @code
67   // Owning - takes ownership of the source 67   // Owning - takes ownership of the source
68   any_read_source rs(some_source{args...}); 68   any_read_source rs(some_source{args...});
69   69  
70   // Reference - wraps without ownership 70   // Reference - wraps without ownership
71   some_source source; 71   some_source source;
72   any_read_source rs(&source); 72   any_read_source rs(&source);
73   73  
74   mutable_buffer buf(data, size); 74   mutable_buffer buf(data, size);
75   auto [ec, n] = co_await rs.read(std::span(&buf, 1)); 75   auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76   @endcode 76   @endcode
77   77  
78   @see any_read_stream, ReadSource 78   @see any_read_stream, ReadSource
79   */ 79   */
80   class any_read_source 80   class any_read_source
81   { 81   {
82   struct vtable; 82   struct vtable;
83   struct awaitable_ops; 83   struct awaitable_ops;
84   84  
85   template<ReadSource S> 85   template<ReadSource S>
86   struct vtable_for_impl; 86   struct vtable_for_impl;
87   87  
88   void* source_ = nullptr; 88   void* source_ = nullptr;
89   vtable const* vt_ = nullptr; 89   vtable const* vt_ = nullptr;
90   void* cached_awaitable_ = nullptr; 90   void* cached_awaitable_ = nullptr;
91   void* storage_ = nullptr; 91   void* storage_ = nullptr;
92   awaitable_ops const* active_ops_ = nullptr; 92   awaitable_ops const* active_ops_ = nullptr;
93   93  
94   public: 94   public:
95   /** Destructor. 95   /** Destructor.
96   96  
97   Destroys the owned source (if any) and releases the cached 97   Destroys the owned source (if any) and releases the cached
98   awaitable storage. 98   awaitable storage.
99   */ 99   */
100   ~any_read_source(); 100   ~any_read_source();
101   101  
102   /** Construct a default instance. 102   /** Construct a default instance.
103   103  
104   Constructs an empty wrapper. Operations on a default-constructed 104   Constructs an empty wrapper. Operations on a default-constructed
105   wrapper result in undefined behavior. 105   wrapper result in undefined behavior.
106   */ 106   */
107   any_read_source() = default; 107   any_read_source() = default;
108   108  
109   /** Non-copyable. 109   /** Non-copyable.
110   110  
111   The awaitable cache is per-instance and cannot be shared. 111   The awaitable cache is per-instance and cannot be shared.
112   */ 112   */
113   any_read_source(any_read_source const&) = delete; 113   any_read_source(any_read_source const&) = delete;
114   any_read_source& operator=(any_read_source const&) = delete; 114   any_read_source& operator=(any_read_source const&) = delete;
115   115  
116   /** Construct by moving. 116   /** Construct by moving.
117   117  
118   Transfers ownership of the wrapped source (if owned) and 118   Transfers ownership of the wrapped source (if owned) and
119   cached awaitable storage from `other`. After the move, `other` is 119   cached awaitable storage from `other`. After the move, `other` is
120   in a default-constructed state. 120   in a default-constructed state.
121   121  
122   @param other The wrapper to move from. 122   @param other The wrapper to move from.
123   */ 123   */
HITCBC 124   1 any_read_source(any_read_source&& other) noexcept 124   1 any_read_source(any_read_source&& other) noexcept
HITCBC 125   1 : source_(std::exchange(other.source_, nullptr)) 125   1 : source_(std::exchange(other.source_, nullptr))
HITCBC 126   1 , vt_(std::exchange(other.vt_, nullptr)) 126   1 , vt_(std::exchange(other.vt_, nullptr))
HITCBC 127   1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr)) 127   1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
HITCBC 128   1 , storage_(std::exchange(other.storage_, nullptr)) 128   1 , storage_(std::exchange(other.storage_, nullptr))
HITCBC 129   1 , active_ops_(std::exchange(other.active_ops_, nullptr)) 129   1 , active_ops_(std::exchange(other.active_ops_, nullptr))
130   { 130   {
HITCBC 131   1 } 131   1 }
132   132  
133   /** Assign by moving. 133   /** Assign by moving.
134   134  
135   Destroys any owned source and releases existing resources, 135   Destroys any owned source and releases existing resources,
136   then transfers ownership from `other`. 136   then transfers ownership from `other`.
137   137  
138   @param other The wrapper to move from. 138   @param other The wrapper to move from.
139   @return Reference to this wrapper. 139   @return Reference to this wrapper.
140   */ 140   */
141   any_read_source& 141   any_read_source&
142   operator=(any_read_source&& other) noexcept; 142   operator=(any_read_source&& other) noexcept;
143   143  
144   /** Construct by taking ownership of a ReadSource. 144   /** Construct by taking ownership of a ReadSource.
145   145  
146   Allocates storage and moves the source into this wrapper. 146   Allocates storage and moves the source into this wrapper.
147   The wrapper owns the source and will destroy it. 147   The wrapper owns the source and will destroy it.
148   148  
149   @param s The source to take ownership of. 149   @param s The source to take ownership of.
150   */ 150   */
151   template<ReadSource S> 151   template<ReadSource S>
152   requires (!std::same_as<std::decay_t<S>, any_read_source>) 152   requires (!std::same_as<std::decay_t<S>, any_read_source>)
153   any_read_source(S s); 153   any_read_source(S s);
154   154  
155   /** Construct by wrapping a ReadSource without ownership. 155   /** Construct by wrapping a ReadSource without ownership.
156   156  
157   Wraps the given source by pointer. The source must remain 157   Wraps the given source by pointer. The source must remain
158   valid for the lifetime of this wrapper. 158   valid for the lifetime of this wrapper.
159   159  
160   @param s Pointer to the source to wrap. 160   @param s Pointer to the source to wrap.
161   */ 161   */
162   template<ReadSource S> 162   template<ReadSource S>
163   any_read_source(S* s); 163   any_read_source(S* s);
164   164  
165   /** Check if the wrapper contains a valid source. 165   /** Check if the wrapper contains a valid source.
166   166  
167   @return `true` if wrapping a source, `false` if default-constructed 167   @return `true` if wrapping a source, `false` if default-constructed
168   or moved-from. 168   or moved-from.
169   */ 169   */
170   bool 170   bool
HITCBC 171   30 has_value() const noexcept 171   30 has_value() const noexcept
172   { 172   {
HITCBC 173   30 return source_ != nullptr; 173   30 return source_ != nullptr;
174   } 174   }
175   175  
176   /** Check if the wrapper contains a valid source. 176   /** Check if the wrapper contains a valid source.
177   177  
178   @return `true` if wrapping a source, `false` if default-constructed 178   @return `true` if wrapping a source, `false` if default-constructed
179   or moved-from. 179   or moved-from.
180   */ 180   */
181   explicit 181   explicit
HITCBC 182   8 operator bool() const noexcept 182   8 operator bool() const noexcept
183   { 183   {
HITCBC 184   8 return has_value(); 184   8 return has_value();
185   } 185   }
186   186  
187   /** Initiate a partial read operation. 187   /** Initiate a partial read operation.
188   188  
189   Attempt to read up to `buffer_size( buffers )` bytes into 189   Attempt to read up to `buffer_size( buffers )` bytes into
190   the provided buffer sequence. May fill less than the 190   the provided buffer sequence. May fill less than the
191   full sequence. 191   full sequence.
192   192  
193   @param buffers The buffer sequence to read into. 193   @param buffers The buffer sequence to read into.
194   194  
195   @return An awaitable that await-returns `(error_code,std::size_t)`. 195   @return An awaitable that await-returns `(error_code,std::size_t)`.
196   196  
197   @par Immediate Completion 197   @par Immediate Completion
198   The operation completes immediately without suspending 198   The operation completes immediately without suspending
199   the calling coroutine when: 199   the calling coroutine when:
200   @li The buffer sequence is empty, returning `{error_code{}, 0}`. 200   @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201   @li The underlying source's awaitable reports immediate 201   @li The underlying source's awaitable reports immediate
202   readiness via `await_ready`. 202   readiness via `await_ready`.
203   203  
204   @note This is a partial operation and may not process the 204   @note This is a partial operation and may not process the
205   entire buffer sequence. Use @ref read for guaranteed 205   entire buffer sequence. Use @ref read for guaranteed
206   complete transfer. 206   complete transfer.
207   207  
208   @par Preconditions 208   @par Preconditions
209   The wrapper must contain a valid source (`has_value() == true`). 209   The wrapper must contain a valid source (`has_value() == true`).
210   The caller must not call this function again after a prior 210   The caller must not call this function again after a prior
211   call returned an error (including EOF). 211   call returned an error (including EOF).
212   */ 212   */
213   template<MutableBufferSequence MB> 213   template<MutableBufferSequence MB>
214   auto 214   auto
215   read_some(MB buffers); 215   read_some(MB buffers);
216   216  
217   /** Initiate a complete read operation. 217   /** Initiate a complete read operation.
218   218  
219   Reads data into the provided buffer sequence by forwarding 219   Reads data into the provided buffer sequence by forwarding
220   to the underlying source's `read` operation. Large buffer 220   to the underlying source's `read` operation. Large buffer
221   sequences are processed in windows, with each window 221   sequences are processed in windows, with each window
222   forwarded as a separate `read` call to the underlying source. 222   forwarded as a separate `read` call to the underlying source.
223   The operation completes when the entire buffer sequence is 223   The operation completes when the entire buffer sequence is
224   filled, end-of-file is reached, or an error occurs. 224   filled, end-of-file is reached, or an error occurs.
225   225  
226   @param buffers The buffer sequence to read into. 226   @param buffers The buffer sequence to read into.
227   227  
228   @return An awaitable that await-returns `(error_code,std::size_t)`. 228   @return An awaitable that await-returns `(error_code,std::size_t)`.
229   229  
230   @par Immediate Completion 230   @par Immediate Completion
231   The operation completes immediately without suspending 231   The operation completes immediately without suspending
232   the calling coroutine when: 232   the calling coroutine when:
233   @li The buffer sequence is empty, returning `{error_code{}, 0}`. 233   @li The buffer sequence is empty, returning `{error_code{}, 0}`.
234   @li The underlying source's `read` awaitable reports 234   @li The underlying source's `read` awaitable reports
235   immediate readiness via `await_ready`. 235   immediate readiness via `await_ready`.
236   236  
237   @par Postconditions 237   @par Postconditions
238   Exactly one of the following is true on return: 238   Exactly one of the following is true on return:
239   @li **Success**: `!ec` and `n == buffer_size(buffers)`. 239   @li **Success**: `!ec` and `n == buffer_size(buffers)`.
240   The entire buffer was filled. 240   The entire buffer was filled.
241   @li **End-of-stream or Error**: `ec` and `n` indicates 241   @li **End-of-stream or Error**: `ec` and `n` indicates
242   the number of bytes transferred before the failure. 242   the number of bytes transferred before the failure.
243   243  
244   @par Preconditions 244   @par Preconditions
245   The wrapper must contain a valid source (`has_value() == true`). 245   The wrapper must contain a valid source (`has_value() == true`).
246   The caller must not call this function again after a prior 246   The caller must not call this function again after a prior
247   call returned an error (including EOF). 247   call returned an error (including EOF).
248   */ 248   */
249   template<MutableBufferSequence MB> 249   template<MutableBufferSequence MB>
250   io_task<std::size_t> 250   io_task<std::size_t>
251   read(MB buffers); 251   read(MB buffers);
252   252  
253   protected: 253   protected:
254   /** Rebind to a new source after move. 254   /** Rebind to a new source after move.
255   255  
256   Updates the internal pointer to reference a new source object. 256   Updates the internal pointer to reference a new source object.
257   Used by owning wrappers after move assignment when the owned 257   Used by owning wrappers after move assignment when the owned
258   object has moved to a new location. 258   object has moved to a new location.
259   259  
260   @param new_source The new source to bind to. Must be the same 260   @param new_source The new source to bind to. Must be the same
261   type as the original source. 261   type as the original source.
262   262  
263   @note Terminates if called with a source of different type 263   @note Terminates if called with a source of different type
264   than the original. 264   than the original.
265   */ 265   */
266   template<ReadSource S> 266   template<ReadSource S>
267   void 267   void
268   rebind(S& new_source) noexcept 268   rebind(S& new_source) noexcept
269   { 269   {
270   if(vt_ != &vtable_for_impl<S>::value) 270   if(vt_ != &vtable_for_impl<S>::value)
271   std::terminate(); 271   std::terminate();
272   source_ = &new_source; 272   source_ = &new_source;
273   } 273   }
274   274  
275   private: 275   private:
276   auto 276   auto
277   read_(std::span<mutable_buffer const> buffers); 277   read_(std::span<mutable_buffer const> buffers);
278   }; 278   };
279   279  
280   // ordered by call sequence for cache line coherence 280   // ordered by call sequence for cache line coherence
281   struct any_read_source::awaitable_ops 281   struct any_read_source::awaitable_ops
282   { 282   {
283   bool (*await_ready)(void*); 283   bool (*await_ready)(void*);
284   std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*); 284   std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
285   io_result<std::size_t> (*await_resume)(void*); 285   io_result<std::size_t> (*await_resume)(void*);
286   void (*destroy)(void*) noexcept; 286   void (*destroy)(void*) noexcept;
287   }; 287   };
288   288  
289   // ordered by call frequency for cache line coherence 289   // ordered by call frequency for cache line coherence
290   struct any_read_source::vtable 290   struct any_read_source::vtable
291   { 291   {
292   awaitable_ops const* (*construct_read_some_awaitable)( 292   awaitable_ops const* (*construct_read_some_awaitable)(
293   void* source, 293   void* source,
294   void* storage, 294   void* storage,
295   std::span<mutable_buffer const> buffers); 295   std::span<mutable_buffer const> buffers);
296   awaitable_ops const* (*construct_read_awaitable)( 296   awaitable_ops const* (*construct_read_awaitable)(
297   void* source, 297   void* source,
298   void* storage, 298   void* storage,
299   std::span<mutable_buffer const> buffers); 299   std::span<mutable_buffer const> buffers);
300   std::size_t awaitable_size; 300   std::size_t awaitable_size;
301   std::size_t awaitable_align; 301   std::size_t awaitable_align;
302   void (*destroy)(void*) noexcept; 302   void (*destroy)(void*) noexcept;
303   }; 303   };
304   304  
305   template<ReadSource S> 305   template<ReadSource S>
306   struct any_read_source::vtable_for_impl 306   struct any_read_source::vtable_for_impl
307   { 307   {
308   using ReadSomeAwaitable = decltype(std::declval<S&>().read_some( 308   using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
309   std::span<mutable_buffer const>{})); 309   std::span<mutable_buffer const>{}));
310   using ReadAwaitable = decltype(std::declval<S&>().read( 310   using ReadAwaitable = decltype(std::declval<S&>().read(
311   std::span<mutable_buffer const>{})); 311   std::span<mutable_buffer const>{}));
312   312  
313   static void 313   static void
HITCBC 314   8 do_destroy_impl(void* source) noexcept 314   8 do_destroy_impl(void* source) noexcept
315   { 315   {
HITCBC 316   8 static_cast<S*>(source)->~S(); 316   8 static_cast<S*>(source)->~S();
HITCBC 317   8 } 317   8 }
318   318  
319   static awaitable_ops const* 319   static awaitable_ops const*
HITCBC 320   52 construct_read_some_awaitable_impl( 320   52 construct_read_some_awaitable_impl(
321   void* source, 321   void* source,
322   void* storage, 322   void* storage,
323   std::span<mutable_buffer const> buffers) 323   std::span<mutable_buffer const> buffers)
324   { 324   {
HITCBC 325   52 auto& s = *static_cast<S*>(source); 325   52 auto& s = *static_cast<S*>(source);
HITCBC 326   52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers)); 326   52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
327   327  
328   static constexpr awaitable_ops ops = { 328   static constexpr awaitable_ops ops = {
HITCBC 329   52 +[](void* p) { 329   52 +[](void* p) {
HITCBC 330   52 return static_cast<ReadSomeAwaitable*>(p)->await_ready(); 330   52 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
331   }, 331   },
HITCBC 332   2 +[](void* p, std::coroutine_handle<> h, io_env const* env) { 332   2 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
HITCBC 333   2 return detail::call_await_suspend( 333   2 return detail::call_await_suspend(
HITCBC 334   2 static_cast<ReadSomeAwaitable*>(p), h, env); 334   2 static_cast<ReadSomeAwaitable*>(p), h, env);
335   }, 335   },
HITCBC 336   50 +[](void* p) { 336   50 +[](void* p) {
HITCBC 337   50 return static_cast<ReadSomeAwaitable*>(p)->await_resume(); 337   50 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
338   }, 338   },
HITCBC 339   54 +[](void* p) noexcept { 339   54 +[](void* p) noexcept {
HITCBC 340   2 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable(); 340   2 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
341   } 341   }
342   }; 342   };
HITCBC 343   52 return &ops; 343   52 return &ops;
344   } 344   }
345   345  
346   static awaitable_ops const* 346   static awaitable_ops const*
HITCBC 347   117 construct_read_awaitable_impl( 347   117 construct_read_awaitable_impl(
348   void* source, 348   void* source,
349   void* storage, 349   void* storage,
350   std::span<mutable_buffer const> buffers) 350   std::span<mutable_buffer const> buffers)
351   { 351   {
HITCBC 352   117 auto& s = *static_cast<S*>(source); 352   117 auto& s = *static_cast<S*>(source);
HITCBC 353   117 ::new(storage) ReadAwaitable(s.read(buffers)); 353   117 ::new(storage) ReadAwaitable(s.read(buffers));
354   354  
355   static constexpr awaitable_ops ops = { 355   static constexpr awaitable_ops ops = {
HITCBC 356   117 +[](void* p) { 356   117 +[](void* p) {
HITCBC 357   117 return static_cast<ReadAwaitable*>(p)->await_ready(); 357   117 return static_cast<ReadAwaitable*>(p)->await_ready();
358   }, 358   },
HITCBC 359   1 +[](void* p, std::coroutine_handle<> h, io_env const* env) { 359   1 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
HITCBC 360   1 return detail::call_await_suspend( 360   1 return detail::call_await_suspend(
HITCBC 361   1 static_cast<ReadAwaitable*>(p), h, env); 361   1 static_cast<ReadAwaitable*>(p), h, env);
362   }, 362   },
HITCBC 363   117 +[](void* p) { 363   117 +[](void* p) {
HITCBC 364   117 return static_cast<ReadAwaitable*>(p)->await_resume(); 364   117 return static_cast<ReadAwaitable*>(p)->await_resume();
365   }, 365   },
HITCBC 366   117 +[](void* p) noexcept { 366   117 +[](void* p) noexcept {
MISUBC 367   static_cast<ReadAwaitable*>(p)->~ReadAwaitable(); 367   static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
368   } 368   }
369   }; 369   };
HITCBC 370   117 return &ops; 370   117 return &ops;
371   } 371   }
372   372  
373   static constexpr std::size_t max_awaitable_size = 373   static constexpr std::size_t max_awaitable_size =
374   sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable) 374   sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
375   ? sizeof(ReadSomeAwaitable) 375   ? sizeof(ReadSomeAwaitable)
376   : sizeof(ReadAwaitable); 376   : sizeof(ReadAwaitable);
377   static constexpr std::size_t max_awaitable_align = 377   static constexpr std::size_t max_awaitable_align =
378   alignof(ReadSomeAwaitable) > alignof(ReadAwaitable) 378   alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
379   ? alignof(ReadSomeAwaitable) 379   ? alignof(ReadSomeAwaitable)
380   : alignof(ReadAwaitable); 380   : alignof(ReadAwaitable);
381   381  
382   static constexpr vtable value = { 382   static constexpr vtable value = {
383   &construct_read_some_awaitable_impl, 383   &construct_read_some_awaitable_impl,
384   &construct_read_awaitable_impl, 384   &construct_read_awaitable_impl,
385   max_awaitable_size, 385   max_awaitable_size,
386   max_awaitable_align, 386   max_awaitable_align,
387   &do_destroy_impl 387   &do_destroy_impl
388   }; 388   };
389   }; 389   };
390   390  
391   inline 391   inline
HITCBC 392   148 any_read_source::~any_read_source() 392   148 any_read_source::~any_read_source()
393   { 393   {
HITCBC 394   148 if(storage_) 394   148 if(storage_)
395   { 395   {
HITCBC 396   7 vt_->destroy(source_); 396   7 vt_->destroy(source_);
HITCBC 397   7 ::operator delete(storage_); 397   7 ::operator delete(storage_);
398   } 398   }
HITCBC 399   148 if(cached_awaitable_) 399   148 if(cached_awaitable_)
400   { 400   {
HITCBC 401   141 if(active_ops_) 401   141 if(active_ops_)
HITCBC 402   1 active_ops_->destroy(cached_awaitable_); 402   1 active_ops_->destroy(cached_awaitable_);
HITCBC 403   141 ::operator delete(cached_awaitable_); 403   141 ::operator delete(cached_awaitable_);
404   } 404   }
HITCBC 405   148 } 405   148 }
406   406  
407   inline any_read_source& 407   inline any_read_source&
HITCBC 408   5 any_read_source::operator=(any_read_source&& other) noexcept 408   5 any_read_source::operator=(any_read_source&& other) noexcept
409   { 409   {
HITCBC 410   5 if(this != &other) 410   5 if(this != &other)
411   { 411   {
HITCBC 412   4 if(storage_) 412   4 if(storage_)
413   { 413   {
HITCBC 414   1 vt_->destroy(source_); 414   1 vt_->destroy(source_);
HITCBC 415   1 ::operator delete(storage_); 415   1 ::operator delete(storage_);
416   } 416   }
HITCBC 417   4 if(cached_awaitable_) 417   4 if(cached_awaitable_)
418   { 418   {
HITCBC 419   3 if(active_ops_) 419   3 if(active_ops_)
HITCBC 420   1 active_ops_->destroy(cached_awaitable_); 420   1 active_ops_->destroy(cached_awaitable_);
HITCBC 421   3 ::operator delete(cached_awaitable_); 421   3 ::operator delete(cached_awaitable_);
422   } 422   }
HITCBC 423   4 source_ = std::exchange(other.source_, nullptr); 423   4 source_ = std::exchange(other.source_, nullptr);
HITCBC 424   4 vt_ = std::exchange(other.vt_, nullptr); 424   4 vt_ = std::exchange(other.vt_, nullptr);
HITCBC 425   4 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr); 425   4 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
HITCBC 426   4 storage_ = std::exchange(other.storage_, nullptr); 426   4 storage_ = std::exchange(other.storage_, nullptr);
HITCBC 427   4 active_ops_ = std::exchange(other.active_ops_, nullptr); 427   4 active_ops_ = std::exchange(other.active_ops_, nullptr);
428   } 428   }
HITCBC 429   5 return *this; 429   5 return *this;
430   } 430   }
431   431  
432   template<ReadSource S> 432   template<ReadSource S>
433   requires (!std::same_as<std::decay_t<S>, any_read_source>) 433   requires (!std::same_as<std::decay_t<S>, any_read_source>)
HITCBC 434   9 any_read_source::any_read_source(S s) 434   9 any_read_source::any_read_source(S s)
HITCBC 435   9 : vt_(&vtable_for_impl<S>::value) 435   9 : vt_(&vtable_for_impl<S>::value)
436   { 436   {
437   struct guard { 437   struct guard {
438   any_read_source* self; 438   any_read_source* self;
439   bool committed = false; 439   bool committed = false;
HITCBC 440   9 ~guard() { 440   9 ~guard() {
HITCBC 441   9 if(!committed && self->storage_) { 441   9 if(!committed && self->storage_) {
442   // source_ is null if the source move-ctor threw before 442   // source_ is null if the source move-ctor threw before
443   // the placement-new assigned it. 443   // the placement-new assigned it.
HITCBC 444   1 if(self->source_) 444   1 if(self->source_)
MISUBC 445   self->vt_->destroy(self->source_); 445   self->vt_->destroy(self->source_);
HITCBC 446   1 ::operator delete(self->storage_); 446   1 ::operator delete(self->storage_);
HITCBC 447   1 self->storage_ = nullptr; 447   1 self->storage_ = nullptr;
HITCBC 448   1 self->source_ = nullptr; 448   1 self->source_ = nullptr;
449   } 449   }
HITCBC 450   9 } 450   9 }
HITCBC 451   9 } g{this}; 451   9 } g{this};
452   452  
HITCBC 453   9 storage_ = ::operator new(sizeof(S)); 453   9 storage_ = ::operator new(sizeof(S));
HITCBC 454   9 source_ = ::new(storage_) S(std::move(s)); 454   9 source_ = ::new(storage_) S(std::move(s));
455   455  
456   // Preallocate the awaitable storage 456   // Preallocate the awaitable storage
HITCBC 457   8 cached_awaitable_ = ::operator new(vt_->awaitable_size); 457   8 cached_awaitable_ = ::operator new(vt_->awaitable_size);
458   458  
HITCBC 459   8 g.committed = true; 459   8 g.committed = true;
HITCBC 460   9 } 460   9 }
461   461  
462   template<ReadSource S> 462   template<ReadSource S>
HITCBC 463   136 any_read_source::any_read_source(S* s) 463   136 any_read_source::any_read_source(S* s)
HITCBC 464   136 : source_(s) 464   136 : source_(s)
HITCBC 465   136 , vt_(&vtable_for_impl<S>::value) 465   136 , vt_(&vtable_for_impl<S>::value)
466   { 466   {
467   // Preallocate the awaitable storage 467   // Preallocate the awaitable storage
HITCBC 468   136 cached_awaitable_ = ::operator new(vt_->awaitable_size); 468   136 cached_awaitable_ = ::operator new(vt_->awaitable_size);
HITCBC 469   136 } 469   136 }
470   470  
471   template<MutableBufferSequence MB> 471   template<MutableBufferSequence MB>
472   auto 472   auto
HITCBC 473   54 any_read_source::read_some(MB buffers) 473   54 any_read_source::read_some(MB buffers)
474   { 474   {
475   struct awaitable 475   struct awaitable
476   { 476   {
477   any_read_source* self_; 477   any_read_source* self_;
478   detail::mutable_buffer_array<detail::max_iovec_> ba_; 478   detail::mutable_buffer_array<detail::max_iovec_> ba_;
479   479  
HITCBC 480   54 awaitable(any_read_source* self, MB const& buffers) 480   54 awaitable(any_read_source* self, MB const& buffers)
HITCBC 481   54 : self_(self) 481   54 : self_(self)
HITCBC 482   54 , ba_(buffers) 482   54 , ba_(buffers)
483   { 483   {
HITCBC 484   54 } 484   54 }
485   485  
486   bool 486   bool
HITCBC 487   54 await_ready() const noexcept 487   54 await_ready() const noexcept
488   { 488   {
HITCBC 489   54 return ba_.to_span().empty(); 489   54 return ba_.to_span().empty();
490   } 490   }
491   491  
492   std::coroutine_handle<> 492   std::coroutine_handle<>
HITCBC 493   52 await_suspend(std::coroutine_handle<> h, io_env const* env) 493   52 await_suspend(std::coroutine_handle<> h, io_env const* env)
494   { 494   {
HITCBC 495   52 self_->active_ops_ = self_->vt_->construct_read_some_awaitable( 495   52 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
HITCBC 496   52 self_->source_, 496   52 self_->source_,
HITCBC 497   52 self_->cached_awaitable_, 497   52 self_->cached_awaitable_,
HITCBC 498   52 ba_.to_span()); 498   52 ba_.to_span());
499   499  
HITCBC 500   52 if(self_->active_ops_->await_ready(self_->cached_awaitable_)) 500   52 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
HITCBC 501   50 return h; 501   50 return h;
502   502  
HITCBC 503   2 return self_->active_ops_->await_suspend( 503   2 return self_->active_ops_->await_suspend(
HITCBC 504   2 self_->cached_awaitable_, h, env); 504   2 self_->cached_awaitable_, h, env);
505   } 505   }
506   506  
507   io_result<std::size_t> 507   io_result<std::size_t>
HITCBC 508   52 await_resume() 508   52 await_resume()
509   { 509   {
HITCBC 510   52 if(ba_.to_span().empty()) 510   52 if(ba_.to_span().empty())
HITCBC 511   2 return {{}, 0}; 511   2 return {{}, 0};
512   512  
513   struct guard { 513   struct guard {
514   any_read_source* self; 514   any_read_source* self;
HITCBC 515   50 ~guard() { 515   50 ~guard() {
HITCBC 516   50 self->active_ops_->destroy(self->cached_awaitable_); 516   50 self->active_ops_->destroy(self->cached_awaitable_);
HITCBC 517   50 self->active_ops_ = nullptr; 517   50 self->active_ops_ = nullptr;
HITCBC 518   50 } 518   50 }
HITCBC 519   50 } g{self_}; 519   50 } g{self_};
HITCBC 520   50 return self_->active_ops_->await_resume( 520   50 return self_->active_ops_->await_resume(
HITCBC 521   50 self_->cached_awaitable_); 521   50 self_->cached_awaitable_);
HITCBC 522   50 } 522   50 }
523   }; 523   };
HITCBC 524   54 return awaitable(this, buffers); 524   54 return awaitable(this, buffers);
525   } 525   }
526   526  
527   inline auto 527   inline auto
HITCBC 528   117 any_read_source::read_(std::span<mutable_buffer const> buffers) 528   117 any_read_source::read_(std::span<mutable_buffer const> buffers)
529   { 529   {
530   struct awaitable 530   struct awaitable
531   { 531   {
532   any_read_source* self_; 532   any_read_source* self_;
533   std::span<mutable_buffer const> buffers_; 533   std::span<mutable_buffer const> buffers_;
534   534  
535   bool 535   bool
HITCBC 536   117 await_ready() const noexcept 536   117 await_ready() const noexcept
537   { 537   {
HITCBC 538   117 return false; 538   117 return false;
539   } 539   }
540   540  
541   std::coroutine_handle<> 541   std::coroutine_handle<>
HITCBC 542   117 await_suspend(std::coroutine_handle<> h, io_env const* env) 542   117 await_suspend(std::coroutine_handle<> h, io_env const* env)
543   { 543   {
HITCBC 544   234 self_->active_ops_ = self_->vt_->construct_read_awaitable( 544   234 self_->active_ops_ = self_->vt_->construct_read_awaitable(
HITCBC 545   117 self_->source_, 545   117 self_->source_,
HITCBC 546   117 self_->cached_awaitable_, 546   117 self_->cached_awaitable_,
547   buffers_); 547   buffers_);
548   548  
HITCBC 549   117 if(self_->active_ops_->await_ready(self_->cached_awaitable_)) 549   117 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
HITCBC 550   116 return h; 550   116 return h;
551   551  
HITCBC 552   1 return self_->active_ops_->await_suspend( 552   1 return self_->active_ops_->await_suspend(
HITCBC 553   1 self_->cached_awaitable_, h, env); 553   1 self_->cached_awaitable_, h, env);
554   } 554   }
555   555  
556   io_result<std::size_t> 556   io_result<std::size_t>
HITCBC 557   117 await_resume() 557   117 await_resume()
558   { 558   {
559   struct guard { 559   struct guard {
560   any_read_source* self; 560   any_read_source* self;
HITCBC 561   117 ~guard() { 561   117 ~guard() {
HITCBC 562   117 self->active_ops_->destroy(self->cached_awaitable_); 562   117 self->active_ops_->destroy(self->cached_awaitable_);
HITCBC 563   117 self->active_ops_ = nullptr; 563   117 self->active_ops_ = nullptr;
HITCBC 564   117 } 564   117 }
HITCBC 565   117 } g{self_}; 565   117 } g{self_};
HITCBC 566   117 return self_->active_ops_->await_resume( 566   117 return self_->active_ops_->await_resume(
HITCBC 567   202 self_->cached_awaitable_); 567   202 self_->cached_awaitable_);
HITCBC 568   117 } 568   117 }
569   }; 569   };
HITCBC 570   117 return awaitable{this, buffers}; 570   117 return awaitable{this, buffers};
571   } 571   }
572   572  
573   template<MutableBufferSequence MB> 573   template<MutableBufferSequence MB>
574   io_task<std::size_t> 574   io_task<std::size_t>
HITCBC 575   111 any_read_source::read(MB buffers) 575   111 any_read_source::read(MB buffers)
576   { 576   {
577   buffer_param bp(buffers); 577   buffer_param bp(buffers);
578   std::size_t total = 0; 578   std::size_t total = 0;
579   579  
580   for(;;) 580   for(;;)
581   { 581   {
582   auto bufs = bp.data(); 582   auto bufs = bp.data();
583   if(bufs.empty()) 583   if(bufs.empty())
584   break; 584   break;
585   585  
586   auto [ec, n] = co_await read_(bufs); 586   auto [ec, n] = co_await read_(bufs);
587   total += n; 587   total += n;
588   if(ec) 588   if(ec)
589   co_return {ec, total}; 589   co_return {ec, total};
590   bp.consume(n); 590   bp.consume(n);
591   } 591   }
592   592  
593   co_return {{}, total}; 593   co_return {{}, total};
HITCBC 594   222 } 594   222 }
595   595  
596   } // namespace capy 596   } // namespace capy
597   } // namespace boost 597   } // namespace boost
598   598  
599   #endif 599   #endif