From 19ca79660f759e79e7e5557b355276978198767c Mon Sep 17 00:00:00 2001 From: John Nunley Date: Wed, 1 Nov 2023 19:30:30 -0700 Subject: [PATCH] breaking: Bump all subcrates to their newest versions This is a breaking change. Signed-off-by: John Nunley --- Cargo.toml | 6 +-- examples/linux-inotify.rs | 11 ++--- examples/windows-uds.rs | 101 ++++++++++++++++++++++++++++++++------ 3 files changed, 94 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 25f895b..5eebd20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,14 +18,14 @@ exclude = ["/.*"] async-channel = "2.0.0" async-executor = "1.5.0" async-fs = "2.0.0" -async-io = "1.12.0" +async-io = "2.1.0" async-lock = "3.0.0" async-net = "2.0.0" blocking = "1.3.0" -futures-lite = "1.11.0" +futures-lite = "2.0.0" [target.'cfg(not(target_os = "espidf"))'.dependencies] -async-process = "1.6.0" +async-process = "2.0.0" [dev-dependencies] anyhow = "1" diff --git a/examples/linux-inotify.rs b/examples/linux-inotify.rs index 16cb4dd..3d86067 100644 --- a/examples/linux-inotify.rs +++ b/examples/linux-inotify.rs @@ -9,6 +9,7 @@ #[cfg(target_os = "linux")] fn main() -> std::io::Result<()> { use std::ffi::OsString; + use std::os::unix::io::AsFd; use inotify::{EventMask, Inotify, WatchMask}; use smol::{io, Async}; @@ -34,18 +35,16 @@ fn main() -> std::io::Result<()> { smol::block_on(async { // Watch events in the current directory. - let mut inotify = Async::new(Inotify::init()?)?; - inotify - .get_mut() - .watches() - .add(".", WatchMask::ALL_EVENTS)?; + let mut inotify = Inotify::init()?; + let source = Async::new(inotify.as_fd().try_clone_to_owned()?)?; + inotify.watches().add(".", WatchMask::ALL_EVENTS)?; println!("Watching for filesystem events in the current directory..."); println!("Try opening a file to trigger some events."); println!(); // Wait for events in a loop and print them on the screen. loop { - for event in inotify.read_with_mut(read_op).await? { + for event in source.read_with(|_| read_op(&mut inotify)).await? { println!("{:?}", event); } } diff --git a/examples/windows-uds.rs b/examples/windows-uds.rs index b24726a..3746f5d 100644 --- a/examples/windows-uds.rs +++ b/examples/windows-uds.rs @@ -8,41 +8,112 @@ #[cfg(windows)] fn main() -> std::io::Result<()> { + use std::ops::Deref; + use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket}; use std::path::PathBuf; - use smol::{io, prelude::*, Async, Unblock}; + use smol::{future, prelude::*, Async, Unblock}; + use std::io; use tempfile::tempdir; - use uds_windows::{UnixListener, UnixStream}; + + // n.b.: notgull: uds_windows does not support I/O safety yet, hence the wrapper types + + struct UnixListener(uds_windows::UnixListener); + + impl From for UnixListener { + fn from(ul: uds_windows::UnixListener) -> Self { + Self(ul) + } + } + + impl Deref for UnixListener { + type Target = uds_windows::UnixListener; + + fn deref(&self) -> &uds_windows::UnixListener { + &self.0 + } + } + + impl AsSocket for UnixListener { + fn as_socket(&self) -> BorrowedSocket<'_> { + unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) } + } + } + + struct UnixStream(uds_windows::UnixStream); + + impl From for UnixStream { + fn from(ul: uds_windows::UnixStream) -> Self { + Self(ul) + } + } + + impl Deref for UnixStream { + type Target = uds_windows::UnixStream; + + fn deref(&self) -> &uds_windows::UnixStream { + &self.0 + } + } + + impl AsSocket for UnixStream { + fn as_socket(&self) -> BorrowedSocket<'_> { + unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) } + } + } + + impl io::Read for UnixStream { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + io::Read::read(&mut self.0, buf) + } + } + + impl io::Write for UnixStream { + fn write(&mut self, buf: &[u8]) -> io::Result { + io::Write::write(&mut self.0, buf) + } + + fn flush(&mut self) -> io::Result<()> { + io::Write::flush(&mut self.0) + } + } + + unsafe impl async_io::IoSafe for UnixStream {} async fn client(addr: PathBuf) -> io::Result<()> { // Connect to the address. - let stream = Async::new(UnixStream::connect(addr)?)?; + let stream = Async::new(UnixStream::from(uds_windows::UnixStream::connect(addr)?))?; println!("Connected to {:?}", stream.get_ref().peer_addr()?); // Pipe the stream to stdout. let mut stdout = Unblock::new(std::io::stdout()); - io::copy(&stream, &mut stdout).await?; + futures_lite::io::copy(stream, &mut stdout).await?; Ok(()) } let dir = tempdir()?; let path = dir.path().join("socket"); - smol::block_on(async { + future::block_on(async { // Create a listener. - let listener = Async::new(UnixListener::bind(&path)?)?; + let listener = Async::new(UnixListener::from(uds_windows::UnixListener::bind(&path)?))?; println!("Listening on {:?}", listener.get_ref().local_addr()?); - // Spawn a client task. - let task = smol::spawn(client(path)); + future::try_zip( + async { + // Accept the client. + let (stream, _) = listener.read_with(|l| l.accept()).await?; + println!("Accepted a client"); - // Accept the client. - let (stream, _) = listener.read_with(|l| l.accept()).await?; - println!("Accepted a client"); - - // Send a message, drop the stream, and wait for the client. - Async::new(stream)?.write_all(b"Hello!\n").await?; - task.await?; + // Send a message, drop the stream, and wait for the client. + Async::new(UnixStream::from(stream))? + .write_all(b"Hello!\n") + .await?; + Ok(()) + }, + client(path), + ) + .await?; Ok(()) })