diff --git a/compio-quic/examples/client.rs b/compio-quic/examples/client.rs index e634649b..167bbab1 100644 --- a/compio-quic/examples/client.rs +++ b/compio-quic/examples/client.rs @@ -36,8 +36,8 @@ async fn main() { recv.read_to_end(&mut buf).await.unwrap(); println!("{:?}", buf); - conn.close(1u32.into(), "bye"); + conn.close(1u32.into(), b"bye"); } - endpoint.close(0u32.into(), "").await.unwrap(); + endpoint.shutdown().await.unwrap(); } diff --git a/compio-quic/examples/server.rs b/compio-quic/examples/server.rs index 3a380f88..20b6c01b 100644 --- a/compio-quic/examples/server.rs +++ b/compio-quic/examples/server.rs @@ -34,5 +34,6 @@ async fn main() { conn.closed().await; } - endpoint.close(0u32.into(), "").await.unwrap(); + endpoint.close(0u32.into(), b""); + endpoint.shutdown().await.unwrap(); } diff --git a/compio-quic/src/connection.rs b/compio-quic/src/connection.rs index f5d4c52d..df9905ca 100644 --- a/compio-quic/src/connection.rs +++ b/compio-quic/src/connection.rs @@ -28,7 +28,7 @@ use crate::{wait_event, RecvStream, SendStream, Socket}; #[derive(Debug)] pub(crate) enum ConnectionEvent { - Close(VarInt, String), + Close(VarInt, Bytes), Proto(quinn_proto::ConnectionEvent), } @@ -165,9 +165,9 @@ impl ConnectionInner { } } - fn close(&self, error_code: VarInt, reason: String) { + fn close(&self, error_code: VarInt, reason: Bytes) { let mut state = self.state(); - state.conn.close(Instant::now(), error_code, reason.into()); + state.conn.close(Instant::now(), error_code, reason); state.terminate(ConnectionError::LocallyClosed); state.wake(); self.notify_events(); @@ -497,7 +497,7 @@ impl Future for Connecting { impl Drop for Connecting { fn drop(&mut self) { if Arc::strong_count(&self.0) == 2 { - self.0.close(0u32.into(), String::new()) + self.0.close(0u32.into(), Bytes::new()) } } } @@ -575,23 +575,41 @@ impl Connection { /// Close the connection immediately. /// /// Pending operations will fail immediately with - /// [`ConnectionError::LocallyClosed`]. Delivery of data on unfinished - /// streams is not guaranteed, so the application must call this only when - /// all important communications have been completed, e.g. by calling - /// [`finish`] on outstanding [`SendStream`]s and waiting for the resulting - /// futures to complete. + /// [`ConnectionError::LocallyClosed`]. No more data is sent to the peer + /// and the peer may drop buffered data upon receiving + /// the CONNECTION_CLOSE frame. /// /// `error_code` and `reason` are not interpreted, and are provided directly /// to the peer. /// /// `reason` will be truncated to fit in a single packet with overhead; to - /// improve odds that it is preserved in full, it should be kept under 1KiB. - /// - /// [`ConnectionError::LocallyClosed`]: quinn_proto::ConnectionError::LocallyClosed - /// [`finish`]: crate::SendStream::finish - /// [`SendStream`]: crate::SendStream - pub fn close(&self, error_code: VarInt, reason: &str) { - self.0.close(error_code, reason.to_string()); + /// improve odds that it is preserved in full, it should be kept under + /// 1KiB. + /// + /// # Gracefully closing a connection + /// + /// Only the peer last receiving application data can be certain that all + /// data is delivered. The only reliable action it can then take is to + /// close the connection, potentially with a custom error code. The + /// delivery of the final CONNECTION_CLOSE frame is very likely if both + /// endpoints stay online long enough, and [`Endpoint::shutdown()`] can + /// be used to provide sufficient time. Otherwise, the remote peer will + /// time out the connection, provided that the idle timeout is not + /// disabled. + /// + /// The sending side can not guarantee all stream data is delivered to the + /// remote application. It only knows the data is delivered to the QUIC + /// stack of the remote endpoint. Once the local side sends a + /// CONNECTION_CLOSE frame in response to calling [`close()`] the remote + /// endpoint may drop any data it received but is as yet undelivered to + /// the application, including data that was acknowledged as received to + /// the local endpoint. + /// + /// [`ConnectionError::LocallyClosed`]: ConnectionError::LocallyClosed + /// [`Endpoint::shutdown()`]: crate::Endpoint::shutdown + /// [`close()`]: Connection::close + pub fn close(&self, error_code: VarInt, reason: &[u8]) { + self.0.close(error_code, Bytes::copy_from_slice(reason)); } /// Wait for the connection to be closed for any reason. @@ -601,7 +619,14 @@ impl Connection { let _ = worker.await; } - self.0.state().error.clone().unwrap() + self.0.try_state().unwrap_err() + } + + /// If the connection is closed, the reason why. + /// + /// Returns `None` if the connection is still open. + pub fn close_reason(&self) -> Option { + self.0.try_state().err() } /// Receive an application datagram. @@ -807,7 +832,7 @@ impl Eq for Connection {} impl Drop for Connection { fn drop(&mut self) { if Arc::strong_count(&self.0) == 2 { - self.close(0u32.into(), "") + self.close(0u32.into(), b"") } } } diff --git a/compio-quic/src/endpoint.rs b/compio-quic/src/endpoint.rs index 954501c6..3288d206 100644 --- a/compio-quic/src/endpoint.rs +++ b/compio-quic/src/endpoint.rs @@ -9,6 +9,7 @@ use std::{ time::Instant, }; +use bytes::Bytes; use compio_buf::BufResult; use compio_log::{error, Instrument}; use compio_net::{ToSocketAddrsAsync, UdpSocket}; @@ -33,7 +34,8 @@ struct EndpointState { endpoint: quinn_proto::Endpoint, worker: Option>, connections: HashMap>, - close: Option<(VarInt, String)>, + close: Option<(VarInt, Bytes)>, + exit_on_idle: bool, incoming: VecDeque, } @@ -149,6 +151,7 @@ impl EndpointInner { worker: None, connections: HashMap::new(), close: None, + exit_on_idle: false, incoming: VecDeque::new(), }), socket, @@ -270,8 +273,8 @@ impl EndpointInner { match res { Ok(meta) => self.state.lock().unwrap().handle_data(meta, &recv_buf, respond_fn), Err(e) if e.kind() == io::ErrorKind::ConnectionReset => {} - // #[cfg(windows)] - // Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PORT_UNREACHABLE as _) => {} + #[cfg(windows)] + Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PORT_UNREACHABLE as _) => {} Err(e) => break Err(e), } recv_fut.set(self.socket.recv(recv_buf).fuse()); @@ -282,7 +285,7 @@ impl EndpointInner { } let state = self.state.lock().unwrap(); - if state.close.is_some() && state.is_idle() { + if state.exit_on_idle && state.is_idle() { break Ok(()); } if !state.incoming.is_empty() { @@ -415,6 +418,25 @@ impl Endpoint { self.inner.state.lock().unwrap().endpoint.open_connections() } + /// Close all of this endpoint's connections immediately and cease accepting + /// new connections. + /// + /// See [`Connection::close()`] for details. + /// + /// [`Connection::close()`]: crate::Connection::close + pub fn close(&self, error_code: VarInt, reason: &[u8]) { + let reason = Bytes::copy_from_slice(reason); + let mut state = self.inner.state.lock().unwrap(); + if state.close.is_some() { + return; + } + state.close = Some((error_code, reason.clone())); + for conn in state.connections.values() { + let _ = conn.send(ConnectionEvent::Close(error_code, reason.clone())); + } + self.inner.incoming.notify(usize::MAX.additional()); + } + // Modified from [`SharedFd::try_unwrap_inner`], see notes there. unsafe fn try_unwrap_inner(this: &ManuallyDrop) -> Option { let ptr = ManuallyDrop::new(std::ptr::read(&this.inner)); @@ -427,36 +449,29 @@ impl Endpoint { } } - /// Shutdown the endpoint and close the underlying socket. + /// Gracefully shutdown the endpoint. /// - /// This will close all connections and the underlying socket. Note that it - /// will wait for all connections and all clones of the endpoint (and any - /// clone of the underlying socket) to be dropped before closing the socket. + /// Wait for all connections on the endpoint to be cleanly shut down and + /// close the underlying socket. This will wait for all clones of the + /// endpoint, all connections and all streams to be dropped before + /// closing the socket. /// - /// If the endpoint has already been closed or is closing, this will return - /// immediately with `Ok(())`. + /// Waiting for this condition before exiting ensures that a good-faith + /// effort is made to notify peers of recent connection closes, whereas + /// exiting immediately could force them to wait out the idle timeout + /// period. /// - /// See [`Connection::close()`](crate::Connection::close) for details. - pub async fn close(self, error_code: VarInt, reason: &str) -> io::Result<()> { - let reason = reason.to_string(); - - { - let mut state = self.inner.state.lock().unwrap(); - if state.close.is_some() { - return Ok(()); - } - state.close = Some((error_code, reason.clone())); - } - - for conn in self.inner.state.lock().unwrap().connections.values() { - let _ = conn.send(ConnectionEvent::Close(error_code, reason.clone())); - } - + /// Does not proactively close existing connections. Consider calling + /// [`close()`] if that is desired. + /// + /// [`close()`]: Endpoint::close + pub async fn shutdown(self) -> io::Result<()> { let worker = self.inner.state.lock().unwrap().worker.take(); if let Some(worker) = worker { if self.inner.state.lock().unwrap().is_idle() { worker.cancel().await; } else { + self.inner.state.lock().unwrap().exit_on_idle = true; let _ = worker.await; } } @@ -484,7 +499,11 @@ impl Endpoint { impl Drop for Endpoint { fn drop(&mut self) { if Arc::strong_count(&self.inner) == 2 { + // There are actually two cases: + // 1. User is trying to shutdown the socket. self.inner.done.wake(); + // 2. User dropped the endpoint but the worker is still running. + self.inner.state.lock().unwrap().exit_on_idle = true; } } } diff --git a/compio-quic/tests/basic.rs b/compio-quic/tests/basic.rs index ccf720ab..415dae3b 100644 --- a/compio-quic/tests/basic.rs +++ b/compio-quic/tests/basic.rs @@ -59,9 +59,7 @@ async fn close_endpoint() { None, ) .unwrap(); - - compio_runtime::spawn(endpoint.close(0u32.into(), "")).detach(); - + endpoint.close(0u32.into(), b""); match conn.await { Err(ConnectionError::LocallyClosed) => (), Err(e) => panic!("unexpected error: {e}"),