Skip to content

Commit

Permalink
move AsyncRead{,At}BufferPool trait to compio-io
Browse files Browse the repository at this point in the history
make these traits more general, add BufferPool and Buffer associated
type can make these tarits doesn't depend on specific buffer and buffer
pool type
  • Loading branch information
Sherlock-Holo committed Jul 11, 2024
1 parent 353ecab commit bcb531f
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 106 deletions.
18 changes: 12 additions & 6 deletions compio-fs/src/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use compio_driver::{
op::{BufResultExt, Recv, RecvBufferPool, Send},
AsRawFd, SharedFd, TakeBuffer, ToSharedFd,
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite};
use compio_runtime::{
buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool},
buffer_pool::{BorrowedBuffer, BufferPool},
Attacher,
};
#[cfg(unix)]
Expand Down Expand Up @@ -78,21 +78,27 @@ impl<T: AsRawFd + 'static> AsyncRead for &AsyncFd<T> {
}

impl<T: AsRawFd + 'static> AsyncReadBufferPool for AsyncFd<T> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl<T: AsRawFd + 'static> AsyncReadBufferPool for &AsyncFd<T> {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
let fd = self.to_shared_fd();
let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?;
let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await;
Expand Down
11 changes: 7 additions & 4 deletions compio-fs/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use compio_driver::{
op::{BufResultExt, CloseFile, ReadAt, ReadAtBufferPool, Sync, WriteAt},
TakeBuffer, ToSharedFd,
};
use compio_io::{AsyncReadAt, AsyncWriteAt};
use compio_io::{AsyncReadAt, AsyncReadAtBufferPool, AsyncWriteAt};
use compio_runtime::{
buffer_pool::{AsyncReadAtBufferPool, BorrowedBuffer, BufferPool},
buffer_pool::{BorrowedBuffer, BufferPool},
Attacher,
};
#[cfg(unix)]
Expand Down Expand Up @@ -172,12 +172,15 @@ impl AsyncReadAt for File {
}

impl AsyncReadAtBufferPool for File {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_at_buffer_pool<'a>(
&self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
pos: u64,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
let fd = self.to_shared_fd();
let op = ReadAtBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, pos, len as _)?;
let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await;
Expand Down
61 changes: 41 additions & 20 deletions compio-fs/src/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ use std::ptr::null_mut;
use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null};

use compio_buf::{BufResult, IoBuf, IoBufMut};
use compio_driver::{
impl_raw_fd,
op::{ConnectNamedPipe, RecvBufferPool},
syscall, AsRawFd, RawFd, ToSharedFd,
use compio_driver::{impl_raw_fd, op::ConnectNamedPipe, syscall, AsRawFd, RawFd, ToSharedFd};
use compio_io::{
AsyncRead, AsyncReadAt, AsyncReadAtBufferPool, AsyncReadBufferPool, AsyncWrite, AsyncWriteAt,
};
use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt};
use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool};
use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool};
use widestring::U16CString;
use windows_sys::Win32::{
Storage::FileSystem::{
Expand Down Expand Up @@ -198,22 +196,28 @@ impl AsyncRead for &NamedPipeServer {
}

impl AsyncReadBufferPool for NamedPipeServer {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl AsyncReadBufferPool for &NamedPipeServer {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
self.handle.read_buffer_pool(buffer_pool, len).await
) -> io::Result<Self::Buffer<'a>> {
(&self.handle).read_buffer_pool(buffer_pool, len).await
}
}

Expand Down Expand Up @@ -320,15 +324,6 @@ impl NamedPipeClient {
// Safety: we're ensuring the lifetime of the named pipe.
unsafe { named_pipe_info(self.as_raw_fd()) }
}

#[inline]
pub async fn read_buffer_pool<'a>(
&self,
buffer_pool: &'a BufferPool,
len: u32,
) -> io::Result<BorrowedBuffer<'a>> {
self.handle.read_buffer_pool(buffer_pool, len).await
}
}

impl AsyncRead for NamedPipeClient {
Expand All @@ -346,6 +341,32 @@ impl AsyncRead for &NamedPipeClient {
}
}

impl AsyncReadBufferPool for NamedPipeClient {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl AsyncReadBufferPool for &NamedPipeClient {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>> {
self.handle.read_at_buffer_pool(buffer_pool, 0, len).await
}
}

impl AsyncWrite for NamedPipeClient {
#[inline]
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
Expand Down
18 changes: 12 additions & 6 deletions compio-fs/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use compio_driver::{
op::{BufResultExt, Recv, RecvBufferPool, RecvVectored, Send, SendVectored},
syscall, AsRawFd, TakeBuffer, ToSharedFd,
};
use compio_io::{AsyncRead, AsyncWrite};
use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool};
use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite};
use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool};

use crate::File;

Expand Down Expand Up @@ -504,21 +504,27 @@ impl AsyncRead for &Receiver {
}

impl AsyncReadBufferPool for Receiver {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl AsyncReadBufferPool for &Receiver {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
let fd = self.to_shared_fd();
let op = RecvBufferPool::new(buffer_pool.as_driver_buffer_pool(), fd, len as _)?;
let (BufResult(res, op), flags) = compio_runtime::submit_with_flags(op).await;
Expand Down
18 changes: 12 additions & 6 deletions compio-fs/src/stdio/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::io;

use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::{AsRawFd, RawFd};
use compio_io::{AsyncRead, AsyncWrite};
use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool};
use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite};
use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool};

#[cfg(doc)]
use super::{stderr, stdin, stdout};
Expand Down Expand Up @@ -33,21 +33,27 @@ impl AsyncRead for Stdin {
}

impl AsyncReadBufferPool for Stdin {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl AsyncReadBufferPool for &Stdin {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
(&self.0).read_buffer_pool(buffer_pool, len).await
}
}
Expand Down
4 changes: 2 additions & 2 deletions compio-fs/tests/buffer_pool.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::{Seek, SeekFrom, Write};

use compio_fs::{pipe, AsyncFd, File};
use compio_io::AsyncWriteExt;
use compio_runtime::buffer_pool::{AsyncReadAtBufferPool, AsyncReadBufferPool, BufferPool};
use compio_io::{AsyncReadAtBufferPool, AsyncReadBufferPool, AsyncWriteExt};
use compio_runtime::buffer_pool::BufferPool;
use tempfile::NamedTempFile;

const HELLO: &[u8] = b"hello world...";
Expand Down
47 changes: 46 additions & 1 deletion compio-io/src/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(feature = "allocator_api")]
use std::alloc::Allocator;
use std::{io::Cursor, rc::Rc, sync::Arc};
use std::{io, io::Cursor, ops::DerefMut, rc::Rc, sync::Arc};

use compio_buf::{buf_try, t_alloc, BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBufMut};

Expand Down Expand Up @@ -130,6 +130,51 @@ pub trait AsyncReadAt {
}
}

/// # AsyncReadBufferPool
///
/// Async read with buffer pool
pub trait AsyncReadBufferPool {
/// Buffer pool type
type BufferPool;

/// Filled buffer type
type Buffer<'a>: DerefMut<Target = [u8]>;

/// Read some bytes from this source with [`BufferPool`] and return
/// a [`BorrowedBuffer`].
///
/// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
/// if `len` > 0, `min(len, inner buffer size)` will be the read max len
async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<Self::Buffer<'a>>;
}

/// # AsyncReadAtBufferPool
///
/// Async read with buffer pool and position
pub trait AsyncReadAtBufferPool {
/// Buffer pool type
type BufferPool;

/// Filled buffer type
type Buffer<'a>: DerefMut<Target = [u8]>;

/// Read some bytes from this source at position with [`BufferPool`] and
/// return a [`BorrowedBuffer`].
///
/// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len,
/// if `len` > 0, `min(len, inner buffer size)` will be the read max len
async fn read_at_buffer_pool<'a>(
&self,
buffer_pool: &'a Self::BufferPool,
pos: u64,
len: usize,
) -> io::Result<Self::Buffer<'a>>;
}

macro_rules! impl_read_at {
(@ptr $($ty:ty),*) => {
$(
Expand Down
18 changes: 12 additions & 6 deletions compio-net/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{future::Future, io, net::SocketAddr};

use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
use compio_driver::impl_raw_fd;
use compio_io::{AsyncRead, AsyncWrite};
use compio_runtime::buffer_pool::{AsyncReadBufferPool, BorrowedBuffer, BufferPool};
use compio_io::{AsyncRead, AsyncReadBufferPool, AsyncWrite};
use compio_runtime::buffer_pool::{BorrowedBuffer, BufferPool};
use socket2::{Protocol, SockAddr, Socket as Socket2, Type};

use crate::{
Expand Down Expand Up @@ -231,21 +231,27 @@ impl AsyncRead for TcpStream {
}

impl AsyncReadBufferPool for TcpStream {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
(&*self).read_buffer_pool(buffer_pool, len).await
}
}

impl AsyncReadBufferPool for &TcpStream {
type Buffer<'a> = BorrowedBuffer<'a>;
type BufferPool = BufferPool;

async fn read_buffer_pool<'a>(
&mut self,
buffer_pool: &'a BufferPool,
buffer_pool: &'a Self::BufferPool,
len: usize,
) -> io::Result<BorrowedBuffer<'a>> {
) -> io::Result<Self::Buffer<'a>> {
self.inner.recv_buffer_pool(buffer_pool, len as _).await
}
}
Expand Down
Loading

0 comments on commit bcb531f

Please sign in to comment.