Skip to content

Commit

Permalink
Merge pull request #272 from Sherlock-Holo/rawop-with-flag
Browse files Browse the repository at this point in the history
feat: add flags related methods
  • Loading branch information
Berrysoft authored Jul 9, 2024
2 parents 1dd5683 + a0f85b1 commit e8f2a16
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 19 deletions.
9 changes: 6 additions & 3 deletions compio-driver/src/iour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ impl AsRawFd for Driver {
}
}

fn create_entry(entry: CEntry) -> Entry {
let result = entry.result();
fn create_entry(cq_entry: CEntry) -> Entry {
let result = cq_entry.result();
let result = if result < 0 {
let result = if result == -libc::ECANCELED {
libc::ETIMEDOUT
Expand All @@ -294,7 +294,10 @@ fn create_entry(entry: CEntry) -> Entry {
} else {
Ok(result as _)
};
Entry::new(entry.user_data() as _, result)
let mut entry = Entry::new(cq_entry.user_data() as _, result);
entry.set_flags(cq_entry.flags());

entry
}

fn timespec(duration: std::time::Duration) -> Timespec {
Expand Down
10 changes: 10 additions & 0 deletions compio-driver/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) struct RawOp<T: ?Sized> {
// The metadata in `*mut RawOp<dyn OpCode>`
metadata: usize,
result: PushEntry<Option<Waker>, io::Result<usize>>,
flags: u32,
op: T,
}

Expand Down Expand Up @@ -84,6 +85,7 @@ impl<T: OpCode + 'static> Key<T> {
cancelled: false,
metadata: opcode_metadata::<T>(),
result: PushEntry::Pending(None),
flags: 0,
op,
});
unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) }
Expand Down Expand Up @@ -154,6 +156,14 @@ impl<T: ?Sized> Key<T> {
this.cancelled
}

pub(crate) fn set_flags(&mut self, flags: u32) {
self.as_opaque_mut().flags = flags;
}

pub(crate) fn flags(&self) -> u32 {
self.as_opaque().flags
}

/// Whether the op is completed.
pub(crate) fn has_result(&self) -> bool {
self.as_opaque().result.is_ready()
Expand Down
26 changes: 22 additions & 4 deletions compio-driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ impl Proactor {
}

/// Poll the driver and get completed entries.
/// You need to call [`Proactor::pop`] to get the pushed operations.
/// You need to call [`Proactor::pop`] to get the pushed
/// operations.
pub fn poll(
&mut self,
timeout: Option<Duration>,
Expand All @@ -290,11 +291,12 @@ impl Proactor {
/// # Panics
/// This function will panic if the requested operation has not been
/// completed.
pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, BufResult<usize, T>> {
pub fn pop<T>(&mut self, op: Key<T>) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
instrument!(compio_log::Level::DEBUG, "pop", ?op);
if op.has_result() {
let flags = op.flags();
// SAFETY: completed.
PushEntry::Ready(unsafe { op.into_inner() })
PushEntry::Ready((unsafe { op.into_inner() }, flags))
} else {
PushEntry::Pending(op)
}
Expand Down Expand Up @@ -322,18 +324,33 @@ impl AsRawFd for Proactor {
pub(crate) struct Entry {
user_data: usize,
result: io::Result<usize>,
flags: u32,
}

impl Entry {
pub(crate) fn new(user_data: usize, result: io::Result<usize>) -> Self {
Self { user_data, result }
Self {
user_data,
result,
flags: 0,
}
}

#[cfg(all(target_os = "linux", feature = "io-uring"))]
// this method only used by in io-uring driver
pub(crate) fn set_flags(&mut self, flags: u32) {
self.flags = flags;
}

/// The user-defined data returned by [`Proactor::push`].
pub fn user_data(&self) -> usize {
self.user_data
}

pub fn flags(&self) -> u32 {
self.flags
}

/// The result of the operation.
pub fn into_result(self) -> io::Result<usize> {
self.result
Expand All @@ -357,6 +374,7 @@ impl<E: Extend<usize>> Extend<Entry> for OutEntries<'_, E> {
self.entries.extend(iter.into_iter().filter_map(|e| {
let user_data = e.user_data();
let mut op = unsafe { Key::<()>::new_unchecked(user_data) };
op.set_flags(e.flags());
if op.set_result(e.into_result()) {
// SAFETY: completed and cancelled.
let _ = unsafe { op.into_box() };
Expand Down
2 changes: 1 addition & 1 deletion compio-driver/tests/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn push_and_wait<O: OpCode + 'static>(driver: &mut Proactor, op: O) -> BufResult
driver.poll(None, &mut entries).unwrap();
}
assert_eq!(entries[0], user_data.user_data());
driver.pop(user_data).take_ready().unwrap()
driver.pop(user_data).take_ready().unwrap().0
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion compio-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ pub mod time;
pub use async_task::Task;
pub use attacher::*;
use compio_buf::BufResult;
pub use runtime::{spawn, spawn_blocking, submit, JoinHandle, Runtime, RuntimeBuilder};
pub use runtime::{
spawn, spawn_blocking, submit, submit_with_flags, JoinHandle, Runtime, RuntimeBuilder,
};
38 changes: 34 additions & 4 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use send_wrapper::SendWrapper;

#[cfg(feature = "time")]
use crate::runtime::time::{TimerFuture, TimerRuntime};
use crate::{runtime::op::OpFuture, BufResult};
use crate::{runtime::op::OpFlagsFuture, BufResult};

scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);

Expand Down Expand Up @@ -235,9 +235,26 @@ impl Runtime {
///
/// You only need this when authoring your own [`OpCode`].
pub fn submit<T: OpCode + 'static>(&self, op: T) -> impl Future<Output = BufResult<usize, T>> {
self.submit_with_flags(op).map(|(res, _)| res)
}

/// Submit an operation to the runtime.
///
/// The difference between [`Runtime::submit`] is this method will return
/// the flags
///
/// You only need this when authoring your own [`OpCode`].
pub fn submit_with_flags<T: OpCode + 'static>(
&self,
op: T,
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
match self.submit_raw(op) {
PushEntry::Pending(user_data) => Either::Left(OpFuture::new(user_data)),
PushEntry::Ready(res) => Either::Right(ready(res)),
PushEntry::Pending(user_data) => Either::Left(OpFlagsFuture::new(user_data)),
PushEntry::Ready(res) => {
// submit_flags won't be ready immediately, if ready, it must be error without
// flags
Either::Right(ready((res, 0)))
}
}
}

Expand All @@ -264,7 +281,7 @@ impl Runtime {
&self,
cx: &mut Context,
op: Key<T>,
) -> PushEntry<Key<T>, BufResult<usize, T>> {
) -> PushEntry<Key<T>, (BufResult<usize, T>, u32)> {
instrument!(compio_log::Level::DEBUG, "poll_task", ?op);
let mut driver = self.driver.borrow_mut();
driver.pop(op).map_pending(|mut k| {
Expand Down Expand Up @@ -435,3 +452,16 @@ pub fn spawn_blocking<T: Send + 'static>(
pub fn submit<T: OpCode + 'static>(op: T) -> impl Future<Output = BufResult<usize, T>> {
Runtime::with_current(|r| r.submit(op))
}

/// Submit an operation to the current runtime, and return a future for it with
/// flags.
///
/// ## Panics
///
/// This method doesn't create runtime. It tries to obtain the current runtime
/// by [`Runtime::with_current`].
pub fn submit_with_flags<T: OpCode + 'static>(
op: T,
) -> impl Future<Output = (BufResult<usize, T>, u32)> {
Runtime::with_current(|r| r.submit_with_flags(op))
}
10 changes: 5 additions & 5 deletions compio-runtime/src/runtime/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ use compio_driver::{Key, OpCode, PushEntry};
use crate::runtime::Runtime;

#[derive(Debug)]
pub struct OpFuture<T: OpCode> {
pub struct OpFlagsFuture<T: OpCode> {
key: Option<Key<T>>,
}

impl<T: OpCode> OpFuture<T> {
impl<T: OpCode> OpFlagsFuture<T> {
pub fn new(key: Key<T>) -> Self {
Self { key: Some(key) }
}
}

impl<T: OpCode> Future for OpFuture<T> {
type Output = BufResult<usize, T>;
impl<T: OpCode> Future for OpFlagsFuture<T> {
type Output = (BufResult<usize, T>, u32);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap()));
Expand All @@ -35,7 +35,7 @@ impl<T: OpCode> Future for OpFuture<T> {
}
}

impl<T: OpCode> Drop for OpFuture<T> {
impl<T: OpCode> Drop for OpFlagsFuture<T> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
Runtime::with_current(|r| r.cancel_op(key))
Expand Down
7 changes: 6 additions & 1 deletion compio/examples/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ fn push_and_wait<O: OpCode + 'static>(driver: &mut Proactor, op: O) -> (usize, O
driver.poll(None, &mut entries).unwrap();
}
assert_eq!(entries[0], user_data.user_data());
driver.pop(user_data).take_ready().unwrap().unwrap()
driver
.pop(user_data)
.map_ready(|(res, _)| res)
.take_ready()
.unwrap()
.unwrap()
}
}
}
Expand Down

0 comments on commit e8f2a16

Please sign in to comment.