diff --git a/compio-fs/src/async_fd.rs b/compio-fs/src/async_fd.rs index 0ca8adf7..b2928d79 100644 --- a/compio-fs/src/async_fd.rs +++ b/compio-fs/src/async_fd.rs @@ -11,9 +11,9 @@ use compio_driver::{ op::{BufResultExt, Recv, RecvBufferPool, Send}, AsRawFd, SharedFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; use compio_runtime::{ - buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}, + buffer_pool::{BorrowedBuffer, BufferPool}, Attacher, }; #[cfg(unix)] @@ -78,21 +78,27 @@ impl AsyncRead for &AsyncFd { } impl AsyncReadBufferPool for AsyncFd { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&*self).read_buffer_pool(buffer_pool, len).await } } impl AsyncReadBufferPool for &AsyncFd { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; diff --git a/compio-fs/src/file.rs b/compio-fs/src/file.rs index 51162232..7096c602 100644 --- a/compio-fs/src/file.rs +++ b/compio-fs/src/file.rs @@ -6,9 +6,9 @@ use compio_driver::{ op::{BufResultExt, CloseFile, ReadAt, ReadAtBufferPool, Sync, WriteAt}, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncReadAt, AsyncWriteAt}; +use compio_io::{AsyncReadAt, AsyncReadAtBufferPool, AsyncWriteAt}; use compio_runtime::{ - buffer_pool::{AsyncReadAtBufferPool, BorrowedBuffer, BufferPool}, + buffer_pool::{BorrowedBuffer, BufferPool}, Attacher, }; #[cfg(unix)] @@ -172,12 +172,15 @@ impl AsyncReadAt for File { } impl AsyncReadAtBufferPool for File { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_at_buffer_pool<'a>( &self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, pos: u64, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = ReadAtBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, pos, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; diff --git a/compio-fs/src/named_pipe.rs b/compio-fs/src/named_pipe.rs index 6583604f..9a58d257 100644 --- a/compio-fs/src/named_pipe.rs +++ b/compio-fs/src/named_pipe.rs @@ -7,13 +7,11 @@ use std::ptr::null_mut; use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null}; use compio_buf::{BufResult, IoBuf, IoBufMut}; -use compio_driver::{ - impl_raw_fd, - op::{ConnectNamedPipe, RecvBufferPool}, - syscall, AsRawFd, RawFd, ToSharedFd, +use compio_driver::{impl_raw_fd, op::ConnectNamedPipe, syscall, AsRawFd, RawFd, ToSharedFd}; +use compio_io::{ + AsyncRead, AsyncReadAt, AsyncReadAtBufferPool, AsyncReadBufferPool, AsyncWrite, AsyncWriteAt, }; -use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use widestring::U16CString; use windows_sys::Win32::{ Storage::FileSystem::{ @@ -198,22 +196,28 @@ impl AsyncRead for &NamedPipeServer { } impl AsyncReadBufferPool for NamedPipeServer { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&*self).read_buffer_pool(buffer_pool, len).await } } impl AsyncReadBufferPool for &NamedPipeServer { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { - self.handle.read_buffer_pool(buffer_pool, len).await + ) -> io::Result> { + (&self.handle).read_buffer_pool(buffer_pool, len).await } } @@ -320,15 +324,6 @@ impl NamedPipeClient { // Safety: we're ensuring the lifetime of the named pipe. unsafe { named_pipe_info(self.as_raw_fd()) } } - - #[inline] - pub async fn read_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - len: u32, - ) -> io::Result> { - self.handle.read_buffer_pool(buffer_pool, len).await - } } impl AsyncRead for NamedPipeClient { @@ -346,6 +341,32 @@ impl AsyncRead for &NamedPipeClient { } } +impl AsyncReadBufferPool for NamedPipeClient { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_buffer_pool(buffer_pool, len).await + } +} + +impl AsyncReadBufferPool for &NamedPipeClient { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + self.handle.read_at_buffer_pool(buffer_pool, 0, len).await + } +} + impl AsyncWrite for NamedPipeClient { #[inline] async fn write(&mut self, buf: T) -> BufResult { diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index ee51c7bb..81ce56f8 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -13,8 +13,8 @@ use compio_driver::{ op::{BufResultExt, Recv, RecvBufferPool, RecvVectored, Send, SendVectored}, syscall, AsRawFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use crate::File; @@ -504,21 +504,27 @@ impl AsyncRead for &Receiver { } impl AsyncReadBufferPool for Receiver { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&*self).read_buffer_pool(buffer_pool, len).await } } impl AsyncReadBufferPool for &Receiver { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; diff --git a/compio-fs/src/stdio/unix.rs b/compio-fs/src/stdio/unix.rs index 8abcd420..444b581f 100644 --- a/compio-fs/src/stdio/unix.rs +++ b/compio-fs/src/stdio/unix.rs @@ -2,8 +2,8 @@ use std::io; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::{AsRawFd, RawFd}; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; #[cfg(doc)] use super::{stderr, stdin, stdout}; @@ -33,21 +33,27 @@ impl AsyncRead for Stdin { } impl AsyncReadBufferPool for Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&*self).read_buffer_pool(buffer_pool, len).await } } impl AsyncReadBufferPool for &Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&self.0).read_buffer_pool(buffer_pool, len).await } } diff --git a/compio-fs/tests/buffer_pool.rs b/compio-fs/tests/buffer_pool.rs index 7f90e895..1a7ef442 100644 --- a/compio-fs/tests/buffer_pool.rs +++ b/compio-fs/tests/buffer_pool.rs @@ -1,8 +1,8 @@ use std::io::{Seek, SeekFrom, Write}; use compio_fs::{pipe, AsyncFd, File}; -use compio_io::AsyncWriteExt; -use compio_runtime::buffer_pool::{AsyncReadAtBufferPool, AsyncReadBufferPool, BufferPool}; +use compio_io::{AsyncReadAtBufferPool, AsyncReadBufferPool, AsyncWriteExt}; +use compio_runtime::buffer_pool::BufferPool; use tempfile::NamedTempFile; const HELLO: &[u8] = b"hello world..."; diff --git a/compio-io/src/read/mod.rs b/compio-io/src/read/mod.rs index 69032b96..d8d3426f 100644 --- a/compio-io/src/read/mod.rs +++ b/compio-io/src/read/mod.rs @@ -1,6 +1,6 @@ #[cfg(feature = "allocator_api")] use std::alloc::Allocator; -use std::{io::Cursor, rc::Rc, sync::Arc}; +use std::{io, io::Cursor, ops::DerefMut, rc::Rc, sync::Arc}; use compio_buf::{buf_try, t_alloc, BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBufMut}; @@ -130,6 +130,51 @@ pub trait AsyncReadAt { } } +/// # AsyncReadBufferPool +/// +/// Async read with buffer pool +pub trait AsyncReadBufferPool { + /// Buffer pool type + type BufferPool; + + /// Filled buffer type + type Buffer<'a>: DerefMut; + + /// Read some bytes from this source with [`BufferPool`] and return + /// a [`BorrowedBuffer`]. + /// + /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, + /// if `len` > 0, `min(len, inner buffer size)` will be the read max len + async fn read_buffer_pool<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result>; +} + +/// # AsyncReadAtBufferPool +/// +/// Async read with buffer pool and position +pub trait AsyncReadAtBufferPool { + /// Buffer pool type + type BufferPool; + + /// Filled buffer type + type Buffer<'a>: DerefMut; + + /// Read some bytes from this source at position with [`BufferPool`] and + /// return a [`BorrowedBuffer`]. + /// + /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, + /// if `len` > 0, `min(len, inner buffer size)` will be the read max len + async fn read_at_buffer_pool<'a>( + &self, + buffer_pool: &'a Self::BufferPool, + pos: u64, + len: usize, + ) -> io::Result>; +} + macro_rules! impl_read_at { (@ptr $($ty:ty),*) => { $( diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index d74f51c5..592cd6cb 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -2,8 +2,8 @@ use std::{future::Future, io, net::SocketAddr}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use socket2::{Protocol, SockAddr, Socket as Socket2, Type}; use crate::{ @@ -231,21 +231,27 @@ impl AsyncRead for TcpStream { } impl AsyncReadBufferPool for TcpStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&*self).read_buffer_pool(buffer_pool, len).await } } impl AsyncReadBufferPool for &TcpStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { self.inner.recv_buffer_pool(buffer_pool, len as _).await } } diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index 0fd91369..0b08d434 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -2,8 +2,8 @@ use std::{future::Future, io, path::Path}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use socket2::{SockAddr, Socket as Socket2, Type}; use crate::{OwnedReadHalf, OwnedWriteHalf, PollFd, ReadHalf, Socket, WriteHalf}; @@ -225,21 +225,27 @@ impl AsyncRead for &UnixStream { } impl AsyncReadBufferPool for UnixStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { (&*self).read_buffer_pool(buffer_pool, len).await } } impl AsyncReadBufferPool for &UnixStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { self.inner.recv_buffer_pool(buffer_pool, len as _).await } } diff --git a/compio-net/tests/buffer_pool.rs b/compio-net/tests/buffer_pool.rs index 3925f549..13501a3f 100644 --- a/compio-net/tests/buffer_pool.rs +++ b/compio-net/tests/buffer_pool.rs @@ -1,8 +1,8 @@ use std::net::Ipv6Addr; -use compio_io::AsyncWriteExt; +use compio_io::{AsyncReadBufferPool, AsyncWriteExt}; use compio_net::{TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BufferPool}; +use compio_runtime::buffer_pool::BufferPool; #[compio_macros::test] async fn test_tcp_read_buffer_pool() { diff --git a/compio-process/src/unix.rs b/compio-process/src/unix.rs index 63827c80..e9329930 100644 --- a/compio-process/src/unix.rs +++ b/compio-process/src/unix.rs @@ -5,8 +5,8 @@ use compio_driver::{ op::{BufResultExt, Recv, RecvBufferPool, Send}, AsRawFd, RawFd, SharedFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -37,11 +37,14 @@ impl AsyncRead for ChildStdout { } impl AsyncReadBufferPool for ChildStdout { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; @@ -71,11 +74,14 @@ impl AsyncRead for ChildStderr { } impl AsyncReadBufferPool for ChildStderr { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; diff --git a/compio-process/src/windows.rs b/compio-process/src/windows.rs index 1202018c..5dafc333 100644 --- a/compio-process/src/windows.rs +++ b/compio-process/src/windows.rs @@ -11,8 +11,8 @@ use compio_driver::{ op::{BufResultExt, Recv, RecvBufferPool, Send}, syscall, AsRawFd, OpCode, OpType, RawFd, SharedFd, TakeBuffer, ToSharedFd, }; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool}; +use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite}; +use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool}; use windows_sys::Win32::System::{Threading::GetExitCodeProcess, IO::OVERLAPPED}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -69,11 +69,14 @@ impl AsyncRead for ChildStdout { } impl AsyncReadBufferPool for ChildStdout { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; @@ -103,11 +106,14 @@ impl AsyncRead for ChildStderr { } impl AsyncReadBufferPool for ChildStderr { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + async fn read_buffer_pool<'a>( &mut self, - buffer_pool: &'a BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> io::Result> { + ) -> io::Result> { let fd = self.to_shared_fd(); let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?; let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await; diff --git a/compio-runtime/src/buffer_pool.rs b/compio-runtime/src/buffer_pool.rs index 5830c2c4..196f9eb7 100644 --- a/compio-runtime/src/buffer_pool.rs +++ b/compio-runtime/src/buffer_pool.rs @@ -68,38 +68,3 @@ impl Drop for BufferPool { }); } } - -#[allow(async_fn_in_trait)] -/// # AsyncReadBufferPool -/// -/// Async read with buffer pool -pub trait AsyncReadBufferPool { - /// Read some bytes from this source with [`BufferPool`] and return - /// a [`BorrowedBuffer`]. - /// - /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, - /// if `len` > 0, `min(len, inner buffer size)` will be the read max len - async fn read_buffer_pool<'a>( - &mut self, - buffer_pool: &'a BufferPool, - len: usize, - ) -> io::Result>; -} - -#[allow(async_fn_in_trait)] -/// # AsyncReadAtBufferPool -/// -/// Async read with buffer pool and position -pub trait AsyncReadAtBufferPool { - /// Read some bytes from this source at position with [`BufferPool`] and - /// return a [`BorrowedBuffer`]. - /// - /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, - /// if `len` > 0, `min(len, inner buffer size)` will be the read max len - async fn read_at_buffer_pool<'a>( - &self, - buffer_pool: &'a BufferPool, - pos: u64, - len: usize, - ) -> io::Result>; -}