Skip to content

Commit

Permalink
feat(quic): improved close logic
Browse files Browse the repository at this point in the history
  • Loading branch information
AsakuraMizu committed Aug 15, 2024
1 parent 83b928b commit aae6db4
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 50 deletions.
4 changes: 2 additions & 2 deletions compio-quic/examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ async fn main() {
recv.read_to_end(&mut buf).await.unwrap();
println!("{:?}", buf);

conn.close(1u32.into(), "bye");
conn.close(1u32.into(), b"bye");
}

endpoint.close(0u32.into(), "").await.unwrap();
endpoint.shutdown().await.unwrap();
}
3 changes: 2 additions & 1 deletion compio-quic/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ async fn main() {
conn.closed().await;
}

endpoint.close(0u32.into(), "").await.unwrap();
endpoint.close(0u32.into(), b"");
endpoint.shutdown().await.unwrap();
}
61 changes: 43 additions & 18 deletions compio-quic/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{wait_event, RecvStream, SendStream, Socket};

#[derive(Debug)]
pub(crate) enum ConnectionEvent {
Close(VarInt, String),
Close(VarInt, Bytes),
Proto(quinn_proto::ConnectionEvent),
}

Expand Down Expand Up @@ -165,9 +165,9 @@ impl ConnectionInner {
}
}

fn close(&self, error_code: VarInt, reason: String) {
fn close(&self, error_code: VarInt, reason: Bytes) {
let mut state = self.state();
state.conn.close(Instant::now(), error_code, reason.into());
state.conn.close(Instant::now(), error_code, reason);
state.terminate(ConnectionError::LocallyClosed);
state.wake();
self.notify_events();
Expand Down Expand Up @@ -497,7 +497,7 @@ impl Future for Connecting {
impl Drop for Connecting {
fn drop(&mut self) {
if Arc::strong_count(&self.0) == 2 {
self.0.close(0u32.into(), String::new())
self.0.close(0u32.into(), Bytes::new())
}
}
}
Expand Down Expand Up @@ -575,23 +575,41 @@ impl Connection {
/// Close the connection immediately.
///
/// Pending operations will fail immediately with
/// [`ConnectionError::LocallyClosed`]. Delivery of data on unfinished
/// streams is not guaranteed, so the application must call this only when
/// all important communications have been completed, e.g. by calling
/// [`finish`] on outstanding [`SendStream`]s and waiting for the resulting
/// futures to complete.
/// [`ConnectionError::LocallyClosed`]. No more data is sent to the peer
/// and the peer may drop buffered data upon receiving
/// the CONNECTION_CLOSE frame.
///
/// `error_code` and `reason` are not interpreted, and are provided directly
/// to the peer.
///
/// `reason` will be truncated to fit in a single packet with overhead; to
/// improve odds that it is preserved in full, it should be kept under 1KiB.
///
/// [`ConnectionError::LocallyClosed`]: quinn_proto::ConnectionError::LocallyClosed
/// [`finish`]: crate::SendStream::finish
/// [`SendStream`]: crate::SendStream
pub fn close(&self, error_code: VarInt, reason: &str) {
self.0.close(error_code, reason.to_string());
/// improve odds that it is preserved in full, it should be kept under
/// 1KiB.
///
/// # Gracefully closing a connection
///
/// Only the peer last receiving application data can be certain that all
/// data is delivered. The only reliable action it can then take is to
/// close the connection, potentially with a custom error code. The
/// delivery of the final CONNECTION_CLOSE frame is very likely if both
/// endpoints stay online long enough, and [`Endpoint::shutdown()`] can
/// be used to provide sufficient time. Otherwise, the remote peer will
/// time out the connection, provided that the idle timeout is not
/// disabled.
///
/// The sending side can not guarantee all stream data is delivered to the
/// remote application. It only knows the data is delivered to the QUIC
/// stack of the remote endpoint. Once the local side sends a
/// CONNECTION_CLOSE frame in response to calling [`close()`] the remote
/// endpoint may drop any data it received but is as yet undelivered to
/// the application, including data that was acknowledged as received to
/// the local endpoint.
///
/// [`ConnectionError::LocallyClosed`]: ConnectionError::LocallyClosed
/// [`Endpoint::shutdown()`]: crate::Endpoint::shutdown
/// [`close()`]: Connection::close
pub fn close(&self, error_code: VarInt, reason: &[u8]) {
self.0.close(error_code, Bytes::copy_from_slice(reason));
}

/// Wait for the connection to be closed for any reason.
Expand All @@ -601,7 +619,14 @@ impl Connection {
let _ = worker.await;
}

self.0.state().error.clone().unwrap()
self.0.try_state().unwrap_err()
}

/// If the connection is closed, the reason why.
///
/// Returns `None` if the connection is still open.
pub fn close_reason(&self) -> Option<ConnectionError> {
self.0.try_state().err()
}

/// Receive an application datagram.
Expand Down Expand Up @@ -807,7 +832,7 @@ impl Eq for Connection {}
impl Drop for Connection {
fn drop(&mut self) {
if Arc::strong_count(&self.0) == 2 {
self.close(0u32.into(), "")
self.close(0u32.into(), b"")
}
}
}
Expand Down
71 changes: 45 additions & 26 deletions compio-quic/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
time::Instant,
};

use bytes::Bytes;
use compio_buf::BufResult;
use compio_log::{error, Instrument};
use compio_net::{ToSocketAddrsAsync, UdpSocket};
Expand All @@ -33,7 +34,8 @@ struct EndpointState {
endpoint: quinn_proto::Endpoint,
worker: Option<JoinHandle<()>>,
connections: HashMap<ConnectionHandle, Sender<ConnectionEvent>>,
close: Option<(VarInt, String)>,
close: Option<(VarInt, Bytes)>,
exit_on_idle: bool,
incoming: VecDeque<quinn_proto::Incoming>,
}

Expand Down Expand Up @@ -149,6 +151,7 @@ impl EndpointInner {
worker: None,
connections: HashMap::new(),
close: None,
exit_on_idle: false,
incoming: VecDeque::new(),
}),
socket,
Expand Down Expand Up @@ -270,8 +273,8 @@ impl EndpointInner {
match res {
Ok(meta) => self.state.lock().unwrap().handle_data(meta, &recv_buf, respond_fn),
Err(e) if e.kind() == io::ErrorKind::ConnectionReset => {}
// #[cfg(windows)]
// Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PORT_UNREACHABLE as _) => {}
#[cfg(windows)]
Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PORT_UNREACHABLE as _) => {}
Err(e) => break Err(e),
}
recv_fut.set(self.socket.recv(recv_buf).fuse());
Expand All @@ -282,7 +285,7 @@ impl EndpointInner {
}

let state = self.state.lock().unwrap();
if state.close.is_some() && state.is_idle() {
if state.exit_on_idle && state.is_idle() {
break Ok(());
}
if !state.incoming.is_empty() {
Expand Down Expand Up @@ -415,6 +418,25 @@ impl Endpoint {
self.inner.state.lock().unwrap().endpoint.open_connections()
}

/// Close all of this endpoint's connections immediately and cease accepting
/// new connections.
///
/// See [`Connection::close()`] for details.
///
/// [`Connection::close()`]: crate::Connection::close
pub fn close(&self, error_code: VarInt, reason: &[u8]) {
let reason = Bytes::copy_from_slice(reason);
let mut state = self.inner.state.lock().unwrap();
if state.close.is_some() {
return;
}
state.close = Some((error_code, reason.clone()));
for conn in state.connections.values() {
let _ = conn.send(ConnectionEvent::Close(error_code, reason.clone()));
}
self.inner.incoming.notify(usize::MAX.additional());
}

// Modified from [`SharedFd::try_unwrap_inner`], see notes there.
unsafe fn try_unwrap_inner(this: &ManuallyDrop<Self>) -> Option<EndpointInner> {
let ptr = ManuallyDrop::new(std::ptr::read(&this.inner));
Expand All @@ -427,36 +449,29 @@ impl Endpoint {
}
}

/// Shutdown the endpoint and close the underlying socket.
/// Gracefully shutdown the endpoint.
///
/// This will close all connections and the underlying socket. Note that it
/// will wait for all connections and all clones of the endpoint (and any
/// clone of the underlying socket) to be dropped before closing the socket.
/// Wait for all connections on the endpoint to be cleanly shut down and
/// close the underlying socket. This will wait for all clones of the
/// endpoint, all connections and all streams to be dropped before
/// closing the socket.
///
/// If the endpoint has already been closed or is closing, this will return
/// immediately with `Ok(())`.
/// Waiting for this condition before exiting ensures that a good-faith
/// effort is made to notify peers of recent connection closes, whereas
/// exiting immediately could force them to wait out the idle timeout
/// period.
///
/// See [`Connection::close()`](crate::Connection::close) for details.
pub async fn close(self, error_code: VarInt, reason: &str) -> io::Result<()> {
let reason = reason.to_string();

{
let mut state = self.inner.state.lock().unwrap();
if state.close.is_some() {
return Ok(());
}
state.close = Some((error_code, reason.clone()));
}

for conn in self.inner.state.lock().unwrap().connections.values() {
let _ = conn.send(ConnectionEvent::Close(error_code, reason.clone()));
}

/// Does not proactively close existing connections. Consider calling
/// [`close()`] if that is desired.
///
/// [`close()`]: Endpoint::close
pub async fn shutdown(self) -> io::Result<()> {
let worker = self.inner.state.lock().unwrap().worker.take();
if let Some(worker) = worker {
if self.inner.state.lock().unwrap().is_idle() {
worker.cancel().await;
} else {
self.inner.state.lock().unwrap().exit_on_idle = true;
let _ = worker.await;
}
}
Expand Down Expand Up @@ -484,7 +499,11 @@ impl Endpoint {
impl Drop for Endpoint {
fn drop(&mut self) {
if Arc::strong_count(&self.inner) == 2 {
// There are actually two cases:
// 1. User is trying to shutdown the socket.
self.inner.done.wake();
// 2. User dropped the endpoint but the worker is still running.
self.inner.state.lock().unwrap().exit_on_idle = true;
}
}
}
4 changes: 1 addition & 3 deletions compio-quic/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ async fn close_endpoint() {
None,
)
.unwrap();

compio_runtime::spawn(endpoint.close(0u32.into(), "")).detach();

endpoint.close(0u32.into(), b"");
match conn.await {
Err(ConnectionError::LocallyClosed) => (),
Err(e) => panic!("unexpected error: {e}"),
Expand Down

0 comments on commit aae6db4

Please sign in to comment.