From 411a60965d75056fa1111d08779f2f10f56aeafa Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Mon, 1 Jul 2024 22:55:49 +0800 Subject: [PATCH 1/9] feat: add flags related methods --- compio-driver/src/iour/mod.rs | 35 +++++++++++++++++++++-- compio-driver/src/key.rs | 19 +++++++++++++ compio-driver/src/lib.rs | 47 +++++++++++++++++++++++++++++-- compio-runtime/src/runtime/mod.rs | 41 ++++++++++++++++++++++++++- compio-runtime/src/runtime/op.rs | 34 ++++++++++++++++++++++ 5 files changed, 171 insertions(+), 5 deletions(-) diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index d9f250ae..381cb2e0 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -238,6 +238,37 @@ impl Driver { } } + pub fn push_flags( + &mut self, + op: &mut Key, + ) -> Poll<(io::Result, u32)> { + instrument!(compio_log::Level::TRACE, "push_flags", ?op); + let user_data = op.user_data(); + let op_pin = op.as_op_pin(); + trace!("push RawOp"); + match op_pin.create_entry() { + OpEntry::Submission(entry) => { + #[allow(clippy::useless_conversion)] + if let Err(err) = self.push_raw(entry.user_data(user_data as _).into()) { + return Poll::Ready((Err(err), 0)); + } + Poll::Pending + } + #[cfg(feature = "io-uring-sqe128")] + OpEntry::Submission128(entry) => { + if let Err(err) = self.push_raw(entry.user_data(user_data as _)) { + return Poll::Ready((Err(err), 0)); + } + Poll::Pending + } + OpEntry::Blocking => match self.push_blocking(user_data) { + Err(err) => Poll::Ready((Err(err), 0)), + Ok(true) => Poll::Pending, + Ok(false) => Poll::Ready((Err(io::Error::from_raw_os_error(libc::EBUSY)), 0)), + }, + } + } + fn push_blocking(&mut self, user_data: usize) -> io::Result { let handle = self.handle()?; let completed = self.pool_completed.clone(); @@ -247,7 +278,7 @@ impl Driver { let mut op = unsafe { Key::::new_unchecked(user_data) }; let op_pin = op.as_op_pin(); let res = op_pin.call_blocking(); - completed.push(Entry::new(user_data, res)); + completed.push(Entry::new(user_data, res, todo!("how to get flags?"))); handle.notify().ok(); }) .is_ok(); @@ -294,7 +325,7 @@ fn create_entry(entry: CEntry) -> Entry { } else { Ok(result as _) }; - Entry::new(entry.user_data() as _, result) + Entry::new(entry.user_data() as _, result, entry.flags()) } fn timespec(duration: std::time::Duration) -> Timespec { diff --git a/compio-driver/src/key.rs b/compio-driver/src/key.rs index 5b10f139..aed6797d 100644 --- a/compio-driver/src/key.rs +++ b/compio-driver/src/key.rs @@ -21,6 +21,7 @@ pub(crate) struct RawOp { // The metadata in `*mut RawOp` metadata: usize, result: PushEntry, io::Result>, + flags: u32, op: T, } @@ -84,6 +85,7 @@ impl Key { cancelled: false, metadata: opcode_metadata::(), result: PushEntry::Pending(None), + flags: 0, op, }); unsafe { Self::new_unchecked(Box::into_raw(raw_op) as _) } @@ -154,6 +156,10 @@ impl Key { this.cancelled } + pub(crate) fn set_flags(&mut self, flags: u32) { + self.as_opaque_mut().flags = flags; + } + /// Whether the op is completed. pub(crate) fn has_result(&self) -> bool { self.as_opaque().result.is_ready() @@ -189,6 +195,19 @@ impl Key { let op = unsafe { Box::from_raw(self.user_data as *mut RawOp) }; BufResult(op.result.take_ready().unwrap_unchecked(), op.op) } + + /// Get the inner result and flags if it is completed. + /// + /// # Safety + /// + /// Call it only when the op is completed, otherwise it is UB. + pub(crate) unsafe fn into_inner_flags(self) -> (BufResult, u32) { + let op = unsafe { Box::from_raw(self.user_data as *mut RawOp) }; + ( + BufResult(op.result.take_ready().unwrap_unchecked(), op.op), + op.flags, + ) + } } impl Key { diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index 485b321d..af9a7e09 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -272,6 +272,24 @@ impl Proactor { } } + /// Push an operation into the driver, and return the unique key, called + /// user-defined data, associated with it. + pub fn push_flags( + &mut self, + op: T, + ) -> PushEntry, (BufResult, u32)> { + let mut op = self.driver.create_op(op); + match self.driver.push_flags(&mut op) { + Poll::Pending => PushEntry::Pending(op), + Poll::Ready((res, flags)) => { + op.set_result(res); + op.set_flags(flags); + // SAFETY: just completed. + PushEntry::Ready(unsafe { op.into_inner_flags() }) + } + } + } + /// Poll the driver and get completed entries. /// You need to call [`Proactor::pop`] to get the pushed operations. pub fn poll( @@ -300,6 +318,21 @@ impl Proactor { } } + /// Get the pushed operations from the completion entries. + /// + /// # Panics + /// This function will panic if the requested operation has not been + /// completed. + pub fn pop_flags(&mut self, op: Key) -> PushEntry, (BufResult, u32)> { + instrument!(compio_log::Level::DEBUG, "pop_flags", ?op); + if op.has_result() { + // SAFETY: completed. + PushEntry::Ready(unsafe { op.into_inner_flags() }) + } else { + PushEntry::Pending(op) + } + } + /// Update the waker of the specified op. pub fn update_waker(&mut self, op: &mut Key, waker: Waker) { op.set_waker(waker); @@ -322,11 +355,16 @@ impl AsRawFd for Proactor { pub(crate) struct Entry { user_data: usize, result: io::Result, + flags: u32, } impl Entry { - pub(crate) fn new(user_data: usize, result: io::Result) -> Self { - Self { user_data, result } + pub(crate) fn new(user_data: usize, result: io::Result, flags: u32) -> Self { + Self { + user_data, + result, + flags, + } } /// The user-defined data returned by [`Proactor::push`]. @@ -334,6 +372,10 @@ impl Entry { self.user_data } + pub fn flags(&self) -> u32 { + self.flags + } + /// The result of the operation. pub fn into_result(self) -> io::Result { self.result @@ -357,6 +399,7 @@ impl> Extend for OutEntries<'_, E> { self.entries.extend(iter.into_iter().filter_map(|e| { let user_data = e.user_data(); let mut op = unsafe { Key::<()>::new_unchecked(user_data) }; + op.set_flags(e.flags()); if op.set_result(e.into_result()) { // SAFETY: completed and cancelled. let _ = unsafe { op.into_box() }; diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 7276d291..7f669b13 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -31,7 +31,10 @@ use send_wrapper::SendWrapper; #[cfg(feature = "time")] use crate::runtime::time::{TimerFuture, TimerRuntime}; -use crate::{runtime::op::OpFuture, BufResult}; +use crate::{ + runtime::op::{OpFlagsFuture, OpFuture}, + BufResult, +}; scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); @@ -231,6 +234,13 @@ impl Runtime { self.driver.borrow_mut().push(op) } + fn submit_flags_raw( + &self, + op: T, + ) -> PushEntry, (BufResult, u32)> { + self.driver.borrow_mut().push_flags(op) + } + /// Submit an operation to the runtime. /// /// You only need this when authoring your own [`OpCode`]. @@ -241,6 +251,22 @@ impl Runtime { } } + /// Submit an operation to the runtime. + /// + /// The difference between [`Runtime::submit`] is this method will return + /// the flags + /// + /// You only need this when authoring your own [`OpCode`]. + pub fn submit_flags( + &self, + op: T, + ) -> impl Future, u32)> { + match self.submit_flags_raw(op) { + PushEntry::Pending(user_data) => Either::Left(OpFlagsFuture::new(user_data)), + PushEntry::Ready(res) => Either::Right(ready(res)), + } + } + #[cfg(feature = "time")] pub(crate) fn create_timer(&self, delay: std::time::Duration) -> impl Future { let mut timer_runtime = self.timer_runtime.borrow_mut(); @@ -273,6 +299,19 @@ impl Runtime { }) } + pub(crate) fn poll_task_flags( + &self, + cx: &mut Context, + op: Key, + ) -> PushEntry, (BufResult, u32)> { + instrument!(compio_log::Level::DEBUG, "poll_task_flags", ?op); + let mut driver = self.driver.borrow_mut(); + driver.pop_flags(op).map_pending(|mut k| { + driver.update_waker(&mut k, cx.waker().clone()); + k + }) + } + #[cfg(feature = "time")] pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> { instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key); diff --git a/compio-runtime/src/runtime/op.rs b/compio-runtime/src/runtime/op.rs index cb05debd..9bf81192 100644 --- a/compio-runtime/src/runtime/op.rs +++ b/compio-runtime/src/runtime/op.rs @@ -42,3 +42,37 @@ impl Drop for OpFuture { } } } + +#[derive(Debug)] +pub struct OpFlagsFuture { + key: Option>, +} + +impl OpFlagsFuture { + pub fn new(key: Key) -> Self { + Self { key: Some(key) } + } +} + +impl Future for OpFlagsFuture { + type Output = (BufResult, u32); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = Runtime::with_current(|r| r.poll_task_flags(cx, self.key.take().unwrap())); + match res { + PushEntry::Pending(key) => { + self.key = Some(key); + Poll::Pending + } + PushEntry::Ready(res) => Poll::Ready(res), + } + } +} + +impl Drop for OpFlagsFuture { + fn drop(&mut self) { + if let Some(key) = self.key.take() { + Runtime::with_current(|r| r.cancel_op(key)) + } + } +} From 0f26a4733e95b706b92c62959f22ca78d076e6c9 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Mon, 1 Jul 2024 23:56:54 +0800 Subject: [PATCH 2/9] refactor: remove unnecessary push_flags methods --- compio-driver/src/iour/mod.rs | 31 ------------------------------- compio-driver/src/lib.rs | 18 ------------------ compio-runtime/src/runtime/mod.rs | 15 ++++++--------- 3 files changed, 6 insertions(+), 58 deletions(-) diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index 381cb2e0..ebac3b34 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -238,37 +238,6 @@ impl Driver { } } - pub fn push_flags( - &mut self, - op: &mut Key, - ) -> Poll<(io::Result, u32)> { - instrument!(compio_log::Level::TRACE, "push_flags", ?op); - let user_data = op.user_data(); - let op_pin = op.as_op_pin(); - trace!("push RawOp"); - match op_pin.create_entry() { - OpEntry::Submission(entry) => { - #[allow(clippy::useless_conversion)] - if let Err(err) = self.push_raw(entry.user_data(user_data as _).into()) { - return Poll::Ready((Err(err), 0)); - } - Poll::Pending - } - #[cfg(feature = "io-uring-sqe128")] - OpEntry::Submission128(entry) => { - if let Err(err) = self.push_raw(entry.user_data(user_data as _)) { - return Poll::Ready((Err(err), 0)); - } - Poll::Pending - } - OpEntry::Blocking => match self.push_blocking(user_data) { - Err(err) => Poll::Ready((Err(err), 0)), - Ok(true) => Poll::Pending, - Ok(false) => Poll::Ready((Err(io::Error::from_raw_os_error(libc::EBUSY)), 0)), - }, - } - } - fn push_blocking(&mut self, user_data: usize) -> io::Result { let handle = self.handle()?; let completed = self.pool_completed.clone(); diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index af9a7e09..a30a5160 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -272,24 +272,6 @@ impl Proactor { } } - /// Push an operation into the driver, and return the unique key, called - /// user-defined data, associated with it. - pub fn push_flags( - &mut self, - op: T, - ) -> PushEntry, (BufResult, u32)> { - let mut op = self.driver.create_op(op); - match self.driver.push_flags(&mut op) { - Poll::Pending => PushEntry::Pending(op), - Poll::Ready((res, flags)) => { - op.set_result(res); - op.set_flags(flags); - // SAFETY: just completed. - PushEntry::Ready(unsafe { op.into_inner_flags() }) - } - } - } - /// Poll the driver and get completed entries. /// You need to call [`Proactor::pop`] to get the pushed operations. pub fn poll( diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 7f669b13..ce9eb565 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -234,13 +234,6 @@ impl Runtime { self.driver.borrow_mut().push(op) } - fn submit_flags_raw( - &self, - op: T, - ) -> PushEntry, (BufResult, u32)> { - self.driver.borrow_mut().push_flags(op) - } - /// Submit an operation to the runtime. /// /// You only need this when authoring your own [`OpCode`]. @@ -261,9 +254,13 @@ impl Runtime { &self, op: T, ) -> impl Future, u32)> { - match self.submit_flags_raw(op) { + match self.submit_raw(op) { PushEntry::Pending(user_data) => Either::Left(OpFlagsFuture::new(user_data)), - PushEntry::Ready(res) => Either::Right(ready(res)), + PushEntry::Ready(res) => { + // submit_flags won't be ready immediately, if ready, it must be error without + // flags + Either::Right(ready((res, 0))) + } } } From 982eb4849d081240b8f2611867211c49bfdaecc5 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Tue, 2 Jul 2024 21:40:43 +0800 Subject: [PATCH 3/9] refactor: remove redundant methods and ctor args --- compio-driver/src/iour/mod.rs | 11 +++++++---- compio-driver/src/key.rs | 17 ++++------------- compio-driver/src/lib.rs | 11 ++++++++--- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index ebac3b34..ab19e7fb 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -247,7 +247,7 @@ impl Driver { let mut op = unsafe { Key::::new_unchecked(user_data) }; let op_pin = op.as_op_pin(); let res = op_pin.call_blocking(); - completed.push(Entry::new(user_data, res, todo!("how to get flags?"))); + completed.push(Entry::new(user_data, res)); handle.notify().ok(); }) .is_ok(); @@ -282,8 +282,8 @@ impl AsRawFd for Driver { } } -fn create_entry(entry: CEntry) -> Entry { - let result = entry.result(); +fn create_entry(cq_entry: CEntry) -> Entry { + let result = cq_entry.result(); let result = if result < 0 { let result = if result == -libc::ECANCELED { libc::ETIMEDOUT @@ -294,7 +294,10 @@ fn create_entry(entry: CEntry) -> Entry { } else { Ok(result as _) }; - Entry::new(entry.user_data() as _, result, entry.flags()) + let mut entry = Entry::new(cq_entry.user_data() as _, result); + entry.set_flags(entry.flags()); + + entry } fn timespec(duration: std::time::Duration) -> Timespec { diff --git a/compio-driver/src/key.rs b/compio-driver/src/key.rs index aed6797d..c606f7ad 100644 --- a/compio-driver/src/key.rs +++ b/compio-driver/src/key.rs @@ -160,6 +160,10 @@ impl Key { self.as_opaque_mut().flags = flags; } + pub(crate) fn flags(&self) -> u32 { + self.as_opaque().flags + } + /// Whether the op is completed. pub(crate) fn has_result(&self) -> bool { self.as_opaque().result.is_ready() @@ -195,19 +199,6 @@ impl Key { let op = unsafe { Box::from_raw(self.user_data as *mut RawOp) }; BufResult(op.result.take_ready().unwrap_unchecked(), op.op) } - - /// Get the inner result and flags if it is completed. - /// - /// # Safety - /// - /// Call it only when the op is completed, otherwise it is UB. - pub(crate) unsafe fn into_inner_flags(self) -> (BufResult, u32) { - let op = unsafe { Box::from_raw(self.user_data as *mut RawOp) }; - ( - BufResult(op.result.take_ready().unwrap_unchecked(), op.op), - op.flags, - ) - } } impl Key { diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index a30a5160..575e32a4 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -308,8 +308,9 @@ impl Proactor { pub fn pop_flags(&mut self, op: Key) -> PushEntry, (BufResult, u32)> { instrument!(compio_log::Level::DEBUG, "pop_flags", ?op); if op.has_result() { + let flags = op.flags(); // SAFETY: completed. - PushEntry::Ready(unsafe { op.into_inner_flags() }) + PushEntry::Ready((unsafe { op.into_inner() }, flags)) } else { PushEntry::Pending(op) } @@ -341,14 +342,18 @@ pub(crate) struct Entry { } impl Entry { - pub(crate) fn new(user_data: usize, result: io::Result, flags: u32) -> Self { + pub(crate) fn new(user_data: usize, result: io::Result) -> Self { Self { user_data, result, - flags, + flags: 0, } } + pub(crate) fn set_flags(&mut self, flags: u32) { + self.flags = flags; + } + /// The user-defined data returned by [`Proactor::push`]. pub fn user_data(&self) -> usize { self.user_data From a37dd4622dbef1cec45753505018db52da584c39 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Tue, 2 Jul 2024 23:26:47 +0800 Subject: [PATCH 4/9] refactor: make methods name clearer --- compio-driver/src/lib.rs | 5 ++++- compio-runtime/src/runtime/mod.rs | 6 +++--- compio-runtime/src/runtime/op.rs | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index 575e32a4..90b96b60 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -305,7 +305,10 @@ impl Proactor { /// # Panics /// This function will panic if the requested operation has not been /// completed. - pub fn pop_flags(&mut self, op: Key) -> PushEntry, (BufResult, u32)> { + pub fn pop_with_flags( + &mut self, + op: Key, + ) -> PushEntry, (BufResult, u32)> { instrument!(compio_log::Level::DEBUG, "pop_flags", ?op); if op.has_result() { let flags = op.flags(); diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index ce9eb565..a80fa0e5 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -250,7 +250,7 @@ impl Runtime { /// the flags /// /// You only need this when authoring your own [`OpCode`]. - pub fn submit_flags( + pub fn submit_with_flags( &self, op: T, ) -> impl Future, u32)> { @@ -296,14 +296,14 @@ impl Runtime { }) } - pub(crate) fn poll_task_flags( + pub(crate) fn poll_task_with_flags( &self, cx: &mut Context, op: Key, ) -> PushEntry, (BufResult, u32)> { instrument!(compio_log::Level::DEBUG, "poll_task_flags", ?op); let mut driver = self.driver.borrow_mut(); - driver.pop_flags(op).map_pending(|mut k| { + driver.pop_with_flags(op).map_pending(|mut k| { driver.update_waker(&mut k, cx.waker().clone()); k }) diff --git a/compio-runtime/src/runtime/op.rs b/compio-runtime/src/runtime/op.rs index 9bf81192..7f174473 100644 --- a/compio-runtime/src/runtime/op.rs +++ b/compio-runtime/src/runtime/op.rs @@ -58,7 +58,7 @@ impl Future for OpFlagsFuture { type Output = (BufResult, u32); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let res = Runtime::with_current(|r| r.poll_task_flags(cx, self.key.take().unwrap())); + let res = Runtime::with_current(|r| r.poll_task_with_flags(cx, self.key.take().unwrap())); match res { PushEntry::Pending(key) => { self.key = Some(key); From 34bfd3c38b4f8d9da51bdbc125cd8f656d3471c4 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Wed, 3 Jul 2024 12:21:33 +0800 Subject: [PATCH 5/9] refactor: add feature gate for set_flags method clippy complain set_flags is unsed when disable io-uring feature --- compio-driver/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index 90b96b60..f8002aae 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -353,6 +353,8 @@ impl Entry { } } + #[cfg(all(target_os = "linux", feature = "io-uring"))] + // this method only used by in io-uring driver pub(crate) fn set_flags(&mut self, flags: u32) { self.flags = flags; } From c773db5bda598adbc6a3ed8d4e9f64d0a149c534 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Wed, 3 Jul 2024 23:10:52 +0800 Subject: [PATCH 6/9] fix(runtime): runtime miss submit_with_flags function --- compio-runtime/src/lib.rs | 4 +++- compio-runtime/src/runtime/mod.rs | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/compio-runtime/src/lib.rs b/compio-runtime/src/lib.rs index afd9d735..de82cd56 100644 --- a/compio-runtime/src/lib.rs +++ b/compio-runtime/src/lib.rs @@ -22,4 +22,6 @@ pub mod time; pub use async_task::Task; pub use attacher::*; use compio_buf::BufResult; -pub use runtime::{spawn, spawn_blocking, submit, JoinHandle, Runtime, RuntimeBuilder}; +pub use runtime::{ + spawn, spawn_blocking, submit, submit_with_flags, JoinHandle, Runtime, RuntimeBuilder, +}; diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index a80fa0e5..944ebbee 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -471,3 +471,16 @@ pub fn spawn_blocking( pub fn submit(op: T) -> impl Future> { Runtime::with_current(|r| r.submit(op)) } + +/// Submit an operation to the current runtime, and return a future for it with +/// flags. +/// +/// ## Panics +/// +/// This method doesn't create runtime. It tries to obtain the current runtime +/// by [`Runtime::with_current`]. +pub fn submit_with_flags( + op: T, +) -> impl Future, u32)> { + Runtime::with_current(|r| r.submit_with_flags(op)) +} From 4eb426309ddeef001a70f7119842ca406b40ae2a Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Thu, 4 Jul 2024 01:01:21 +0800 Subject: [PATCH 7/9] fix: set_flags always set 0 --- compio-driver/src/iour/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compio-driver/src/iour/mod.rs b/compio-driver/src/iour/mod.rs index ab19e7fb..8c1df4fb 100644 --- a/compio-driver/src/iour/mod.rs +++ b/compio-driver/src/iour/mod.rs @@ -295,7 +295,7 @@ fn create_entry(cq_entry: CEntry) -> Entry { Ok(result as _) }; let mut entry = Entry::new(cq_entry.user_data() as _, result); - entry.set_flags(entry.flags()); + entry.set_flags(cq_entry.flags()); entry } From ad4fe671485983ee15a04e99054cd0ec5eedc8ea Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Fri, 5 Jul 2024 23:37:29 +0800 Subject: [PATCH 8/9] refactor: merge some methods and remove OpFuture struct merge pop and poll_task --- compio-driver/src/lib.rs | 23 +++----------------- compio-driver/tests/file.rs | 6 +++++- compio-runtime/src/runtime/mod.rs | 25 +++------------------ compio-runtime/src/runtime/op.rs | 36 +------------------------------ compio/examples/driver.rs | 7 +++++- 5 files changed, 18 insertions(+), 79 deletions(-) diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index f8002aae..e4508a1b 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -273,7 +273,8 @@ impl Proactor { } /// Poll the driver and get completed entries. - /// You need to call [`Proactor::pop`] to get the pushed operations. + /// You need to call [`Proactor::pop`] to get the pushed + /// operations. pub fn poll( &mut self, timeout: Option, @@ -290,25 +291,7 @@ impl Proactor { /// # Panics /// This function will panic if the requested operation has not been /// completed. - pub fn pop(&mut self, op: Key) -> PushEntry, BufResult> { - instrument!(compio_log::Level::DEBUG, "pop", ?op); - if op.has_result() { - // SAFETY: completed. - PushEntry::Ready(unsafe { op.into_inner() }) - } else { - PushEntry::Pending(op) - } - } - - /// Get the pushed operations from the completion entries. - /// - /// # Panics - /// This function will panic if the requested operation has not been - /// completed. - pub fn pop_with_flags( - &mut self, - op: Key, - ) -> PushEntry, (BufResult, u32)> { + pub fn pop(&mut self, op: Key) -> PushEntry, (BufResult, u32)> { instrument!(compio_log::Level::DEBUG, "pop_flags", ?op); if op.has_result() { let flags = op.flags(); diff --git a/compio-driver/tests/file.rs b/compio-driver/tests/file.rs index c4207765..d7e87aa1 100644 --- a/compio-driver/tests/file.rs +++ b/compio-driver/tests/file.rs @@ -53,7 +53,11 @@ fn push_and_wait(driver: &mut Proactor, op: O) -> BufResult driver.poll(None, &mut entries).unwrap(); } assert_eq!(entries[0], user_data.user_data()); - driver.pop(user_data).take_ready().unwrap() + driver + .pop(user_data) + .map_ready(|(res, _)| res) + .take_ready() + .unwrap() } } } diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 944ebbee..78c13c43 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -31,10 +31,7 @@ use send_wrapper::SendWrapper; #[cfg(feature = "time")] use crate::runtime::time::{TimerFuture, TimerRuntime}; -use crate::{ - runtime::op::{OpFlagsFuture, OpFuture}, - BufResult, -}; +use crate::{runtime::op::OpFlagsFuture, BufResult}; scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); @@ -238,10 +235,7 @@ impl Runtime { /// /// You only need this when authoring your own [`OpCode`]. pub fn submit(&self, op: T) -> impl Future> { - match self.submit_raw(op) { - PushEntry::Pending(user_data) => Either::Left(OpFuture::new(user_data)), - PushEntry::Ready(res) => Either::Right(ready(res)), - } + self.submit_with_flags(op).map(|(res, _)| res) } /// Submit an operation to the runtime. @@ -287,7 +281,7 @@ impl Runtime { &self, cx: &mut Context, op: Key, - ) -> PushEntry, BufResult> { + ) -> PushEntry, (BufResult, u32)> { instrument!(compio_log::Level::DEBUG, "poll_task", ?op); let mut driver = self.driver.borrow_mut(); driver.pop(op).map_pending(|mut k| { @@ -296,19 +290,6 @@ impl Runtime { }) } - pub(crate) fn poll_task_with_flags( - &self, - cx: &mut Context, - op: Key, - ) -> PushEntry, (BufResult, u32)> { - instrument!(compio_log::Level::DEBUG, "poll_task_flags", ?op); - let mut driver = self.driver.borrow_mut(); - driver.pop_with_flags(op).map_pending(|mut k| { - driver.update_waker(&mut k, cx.waker().clone()); - k - }) - } - #[cfg(feature = "time")] pub(crate) fn poll_timer(&self, cx: &mut Context, key: usize) -> Poll<()> { instrument!(compio_log::Level::DEBUG, "poll_timer", ?cx, ?key); diff --git a/compio-runtime/src/runtime/op.rs b/compio-runtime/src/runtime/op.rs index 7f174473..aa7de5dd 100644 --- a/compio-runtime/src/runtime/op.rs +++ b/compio-runtime/src/runtime/op.rs @@ -9,40 +9,6 @@ use compio_driver::{Key, OpCode, PushEntry}; use crate::runtime::Runtime; -#[derive(Debug)] -pub struct OpFuture { - key: Option>, -} - -impl OpFuture { - pub fn new(key: Key) -> Self { - Self { key: Some(key) } - } -} - -impl Future for OpFuture { - type Output = BufResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap())); - match res { - PushEntry::Pending(key) => { - self.key = Some(key); - Poll::Pending - } - PushEntry::Ready(res) => Poll::Ready(res), - } - } -} - -impl Drop for OpFuture { - fn drop(&mut self) { - if let Some(key) = self.key.take() { - Runtime::with_current(|r| r.cancel_op(key)) - } - } -} - #[derive(Debug)] pub struct OpFlagsFuture { key: Option>, @@ -58,7 +24,7 @@ impl Future for OpFlagsFuture { type Output = (BufResult, u32); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let res = Runtime::with_current(|r| r.poll_task_with_flags(cx, self.key.take().unwrap())); + let res = Runtime::with_current(|r| r.poll_task(cx, self.key.take().unwrap())); match res { PushEntry::Pending(key) => { self.key = Some(key); diff --git a/compio/examples/driver.rs b/compio/examples/driver.rs index f114c70b..d7a4c795 100644 --- a/compio/examples/driver.rs +++ b/compio/examples/driver.rs @@ -54,7 +54,12 @@ fn push_and_wait(driver: &mut Proactor, op: O) -> (usize, O driver.poll(None, &mut entries).unwrap(); } assert_eq!(entries[0], user_data.user_data()); - driver.pop(user_data).take_ready().unwrap().unwrap() + driver + .pop(user_data) + .map_ready(|(res, _)| res) + .take_ready() + .unwrap() + .unwrap() } } } From a0f85b12088c4754ad25d1d4ae75a2cff92e7268 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Mon, 8 Jul 2024 18:58:14 +0800 Subject: [PATCH 9/9] refactor: fix minor issues --- compio-driver/src/lib.rs | 2 +- compio-driver/tests/file.rs | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index e4508a1b..d39d8c04 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -292,7 +292,7 @@ impl Proactor { /// This function will panic if the requested operation has not been /// completed. pub fn pop(&mut self, op: Key) -> PushEntry, (BufResult, u32)> { - instrument!(compio_log::Level::DEBUG, "pop_flags", ?op); + instrument!(compio_log::Level::DEBUG, "pop", ?op); if op.has_result() { let flags = op.flags(); // SAFETY: completed. diff --git a/compio-driver/tests/file.rs b/compio-driver/tests/file.rs index d7e87aa1..41ea1c20 100644 --- a/compio-driver/tests/file.rs +++ b/compio-driver/tests/file.rs @@ -53,11 +53,7 @@ fn push_and_wait(driver: &mut Proactor, op: O) -> BufResult driver.poll(None, &mut entries).unwrap(); } assert_eq!(entries[0], user_data.user_data()); - driver - .pop(user_data) - .map_ready(|(res, _)| res) - .take_ready() - .unwrap() + driver.pop(user_data).take_ready().unwrap().0 } } }