From ca3dfc511b28767e0f2da9c09b976ce1a91b94ac Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 4 Jul 2020 10:39:40 +0200 Subject: [PATCH] Initial commit --- .github/FUNDING.yml | 1 + .github/workflows/build-and-test.yaml | 51 +++ .github/workflows/lint.yaml | 26 ++ .github/workflows/security.yaml | 20 + .gitignore | 2 + CHANGELOG.md | 33 ++ Cargo.toml | 22 + LICENSE-APACHE | 201 +++++++++ LICENSE-MIT | 23 + README.md | 57 +++ src/addr.rs | 218 ++++++++++ src/lib.rs | 39 ++ src/tcp.rs | 579 ++++++++++++++++++++++++ src/udp.rs | 596 +++++++++++++++++++++++++ src/unix.rs | 605 ++++++++++++++++++++++++++ 15 files changed, 2473 insertions(+) create mode 100644 .github/FUNDING.yml create mode 100644 .github/workflows/build-and-test.yaml create mode 100644 .github/workflows/lint.yaml create mode 100644 .github/workflows/security.yaml create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 Cargo.toml create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 README.md create mode 100644 src/addr.rs create mode 100644 src/lib.rs create mode 100644 src/tcp.rs create mode 100644 src/udp.rs create mode 100644 src/unix.rs diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..e1f75e7 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +github: stjepang diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml new file mode 100644 index 0000000..19d1ebb --- /dev/null +++ b/.github/workflows/build-and-test.yaml @@ -0,0 +1,51 @@ +name: Build and test + +on: + push: + branches: + - master + pull_request: + +jobs: + build_and_test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest] + rust: [nightly, beta, stable] + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS') + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - name: Set current week of the year in environnement + if: startsWith(matrix.os, 'windows') + run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)" + + - name: Install latest ${{ matrix.rust }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + profile: minimal + override: true + + - name: Run cargo check + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --bins --examples --tests --all-features + + - name: Run cargo check (without dev-dependencies to catch missing feature flags) + if: startsWith(matrix.rust, 'nightly') + uses: actions-rs/cargo@v1 + with: + command: check + args: -Z features=dev_dep + + - name: Run cargo test + uses: actions-rs/cargo@v1 + with: + command: test diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..7e9bd98 --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,26 @@ +name: Lint + +on: + push: + branches: + - master + pull_request: + +jobs: + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + components: clippy + - uses: actions-rs/clippy-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + args: --all-features -- -W clippy::all diff --git a/.github/workflows/security.yaml b/.github/workflows/security.yaml new file mode 100644 index 0000000..8f722e7 --- /dev/null +++ b/.github/workflows/security.yaml @@ -0,0 +1,20 @@ +name: Security audit + +on: + push: + branches: + - master + pull_request: + +jobs: + security_audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..b691165 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,33 @@ +# Version 1.1.5 + +- Replace `usize::MAX` with `std::usize::MAX`. + +# Version 1.1.4 + +- Update dependencies. + +# Version 1.1.3 + +- Fix a deadlock. + +# Version 1.1.2 + +- Remove confusing wording in docs. + +# Version 1.1.1 + +- More elaborate docs. + +# Version 1.1.0 + +- Make locking fair. +- Add `MutexGuard::source()`. + +# Version 1.0.1 + +- Bump the `event-listener` version. +- Add tests. + +# Version 1.0.0 + +- Initial version diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..ab2b08a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "async-net" +version = "0.1.0" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "Async networking primitives for TCP/UDP/Unix communication" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/stjepang/async-net" +homepage = "https://github.com/stjepang/async-net" +documentation = "https://docs.rs/async-net" +keywords = ["networking", "uds", "mio", "reactor", "std"] +categories = ["asynchronous", "network-programming", "os"] +readme = "README.md" + +[dependencies] +async-io = "0.1.1" +blocking = "0.4.6" +futures-io = { version = "0.3.5", default-features = false, features = ["std"] } +futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] } + +[dev-dependencies] +futures = { version = "0.3.5", default-features = false, features = ["std"] } diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..20dab72 --- /dev/null +++ b/README.md @@ -0,0 +1,57 @@ +# async-net + +[![Build](https://github.com/stjepang/async-net/workflows/Build%20and%20test/badge.svg)]( +https://github.com/stjepang/async-net/actions) +[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)]( +https://github.com/stjepang/async-net) +[![Cargo](https://img.shields.io/crates/v/async-net.svg)]( +https://crates.io/crates/async-net) +[![Documentation](https://docs.rs/async-net/badge.svg)]( +https://docs.rs/async-net) + +Async networking primitives for TCP/UDP/Unix communication. + +This crate is an async version of [`std::net`] and [`std::os::unix::net`]. + +[`std::net`]: https://doc.rust-lang.org/std/net/index.html +[`std::os::unix::net`]: https://doc.rust-lang.org/std/os/unix/net/index.html + +## Implementation + +This crate uses [`async-io`] for async I/O and [`blocking`] for DNS lookups. + +[`async-io`]: https://docs.rs/async-io +[`blocking`]: https://docs.rs/blocking + +## Examples + +A simple UDP server that echoes messages back to the sender: + +```rust +use async_net::UdpSocket; + +# blocking::block_on(async { +let socket = UdpSocket::bind("127.0.0.1:8080").await?; +let mut buf = vec![0u8; 1024]; + +loop { + let (n, peer) = socket.recv_from(&mut buf).await?; + socket.send_to(&buf[..n], &peer).await?; +} +# std::io::Result::Ok(()) }); +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/src/addr.rs b/src/addr.rs new file mode 100644 index 0000000..486cd33 --- /dev/null +++ b/src/addr.rs @@ -0,0 +1,218 @@ +use std::future::Future; +use std::io; +use std::mem; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Converts or resolves addresses to [`SocketAddr`] values. +/// +/// This trait currently only appears in function signatures and cannot be used directly. However, +/// you can still use the [`ToSocketAddrs`] trait from the standard library. +/// +/// # Examples +/// +/// To perform a DNS lookup for an address, make [`ToSocketAddrs`] async by wrapping it with the +/// [`blocking`] crate: +/// +/// [`blocking`]: https://docs.rs/blocking +/// +/// ```no_run +/// use blocking::unblock; +/// use std::net::ToSocketAddrs; +/// +/// # blocking::block_on(async { +/// let addrs = unblock(|| "google.com".to_socket_addrs()).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +pub trait AsyncToSocketAddrs: Sealed {} + +pub trait Sealed { + /// Returned iterator over socket addresses which this type may correspond to. + type Iter: Iterator + Unpin; + + /// Converts this object to an iterator of resolved `SocketAddr`s. + /// + /// The returned iterator may not actually yield any values depending on the outcome of any + /// resolution performed. + /// + /// Note that this function may block a backend thread while resolution is performed. + fn to_socket_addrs(&self) -> ToSocketAddrsFuture; +} + +pub enum ToSocketAddrsFuture { + Resolving(Pin> + Send>>), + Ready(io::Result), + Done, +} + +impl + Unpin> Future for ToSocketAddrsFuture { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let state = mem::replace(&mut *self, ToSocketAddrsFuture::Done); + + match state { + ToSocketAddrsFuture::Resolving(mut task) => { + let poll = Pin::new(&mut task).poll(cx); + if poll.is_pending() { + *self = ToSocketAddrsFuture::Resolving(task); + } + poll + } + ToSocketAddrsFuture::Ready(res) => Poll::Ready(res), + ToSocketAddrsFuture::Done => panic!("polled a completed future"), + } + } +} + +impl AsyncToSocketAddrs for SocketAddr {} + +impl Sealed for SocketAddr { + type Iter = std::option::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + ToSocketAddrsFuture::Ready(Ok(Some(*self).into_iter())) + } +} + +impl AsyncToSocketAddrs for SocketAddrV4 {} + +impl Sealed for SocketAddrV4 { + type Iter = std::option::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + Sealed::to_socket_addrs(&SocketAddr::V4(*self)) + } +} + +impl AsyncToSocketAddrs for SocketAddrV6 {} + +impl Sealed for SocketAddrV6 { + type Iter = std::option::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + Sealed::to_socket_addrs(&SocketAddr::V6(*self)) + } +} + +impl AsyncToSocketAddrs for (IpAddr, u16) {} + +impl Sealed for (IpAddr, u16) { + type Iter = std::option::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + let (ip, port) = *self; + match ip { + IpAddr::V4(a) => Sealed::to_socket_addrs(&(a, port)), + IpAddr::V6(a) => Sealed::to_socket_addrs(&(a, port)), + } + } +} + +impl AsyncToSocketAddrs for (Ipv4Addr, u16) {} + +impl Sealed for (Ipv4Addr, u16) { + type Iter = std::option::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + let (ip, port) = *self; + Sealed::to_socket_addrs(&SocketAddrV4::new(ip, port)) + } +} + +impl AsyncToSocketAddrs for (Ipv6Addr, u16) {} + +impl Sealed for (Ipv6Addr, u16) { + type Iter = std::option::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + let (ip, port) = *self; + Sealed::to_socket_addrs(&SocketAddrV6::new(ip, port, 0, 0)) + } +} + +impl AsyncToSocketAddrs for (&str, u16) {} + +impl Sealed for (&str, u16) { + type Iter = std::vec::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + let (host, port) = *self; + + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV4::new(addr, port); + return ToSocketAddrsFuture::Ready(Ok(vec![SocketAddr::V4(addr)].into_iter())); + } + + if let Ok(addr) = host.parse::() { + let addr = SocketAddrV6::new(addr, port, 0, 0); + return ToSocketAddrsFuture::Ready(Ok(vec![SocketAddr::V6(addr)].into_iter())); + } + + let host = host.to_string(); + let task = blocking::unblock(move || { + let addr = (host.as_str(), port); + ToSocketAddrs::to_socket_addrs(&addr) + }); + ToSocketAddrsFuture::Resolving(Box::pin(task)) + } +} + +impl AsyncToSocketAddrs for (String, u16) {} + +impl Sealed for (String, u16) { + type Iter = std::vec::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + Sealed::to_socket_addrs(&(&*self.0, self.1)) + } +} + +impl AsyncToSocketAddrs for str {} + +impl Sealed for str { + type Iter = std::vec::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + if let Ok(addr) = self.parse() { + return ToSocketAddrsFuture::Ready(Ok(vec![addr].into_iter())); + } + + let addr = self.to_string(); + let task = + blocking::unblock(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); + ToSocketAddrsFuture::Resolving(Box::pin(task)) + } +} + +impl AsyncToSocketAddrs for &[SocketAddr] {} + +impl<'a> Sealed for &'a [SocketAddr] { + type Iter = std::iter::Cloned>; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + ToSocketAddrsFuture::Ready(Ok(self.iter().cloned())) + } +} + +impl AsyncToSocketAddrs for &T {} + +impl Sealed for &T { + type Iter = T::Iter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + Sealed::to_socket_addrs(&**self) + } +} + +impl AsyncToSocketAddrs for String {} + +impl Sealed for String { + type Iter = std::vec::IntoIter; + + fn to_socket_addrs(&self) -> ToSocketAddrsFuture { + Sealed::to_socket_addrs(&**self) + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..f62fd42 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,39 @@ +//! Async networking primitives for TCP/UDP/Unix communication. +//! +//! This crate is an async version of [`std::net`] and [`std::os::unix::net`]. +//! +//! # Implementation +//! +//! This crate uses [`async-io`] for async I/O and [`blocking`] for DNS lookups. +//! +//! [`async-io`]: https://docs.rs/async-io +//! [`blocking`]: https://docs.rs/blocking +//! +//! # Examples +//! +//! A simple UDP server that echoes messages back to the sender: +//! +//! ```no_run +//! use async_net::UdpSocket; +//! +//! # blocking::block_on(async { +//! let socket = UdpSocket::bind("127.0.0.1:8080").await?; +//! let mut buf = vec![0u8; 1024]; +//! +//! loop { +//! let (n, peer) = socket.recv_from(&mut buf).await?; +//! socket.send_to(&buf[..n], &peer).await?; +//! } +//! # std::io::Result::Ok(()) }); +//! ``` + +#[cfg(unix)] +pub mod unix; + +mod addr; +mod tcp; +mod udp; + +pub use tcp::{Incoming, TcpListener, TcpStream}; +pub use udp::UdpSocket; +pub use addr::AsyncToSocketAddrs; diff --git a/src/tcp.rs b/src/tcp.rs new file mode 100644 index 0000000..f629fac --- /dev/null +++ b/src/tcp.rs @@ -0,0 +1,579 @@ +use std::future::Future; +use std::io::{self, IoSlice, IoSliceMut}; +use std::net::SocketAddr; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, RawSocket}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use async_io::Async; +use futures_util::io::{AsyncRead, AsyncWrite}; +use futures_util::stream::Stream; + +use crate::addr::AsyncToSocketAddrs; + +/// A TCP server, listening for connections. +/// +/// After creating a [`TcpListener`] by [`bind`][`TcpListener::bind()`]ing it to an address, it +/// listens for incoming TCP connections. These can be accepted by calling +/// [`accept()`][`TcpListener::accept()`] or by awaiting items from the stream of +/// [`incoming`][`TcpListener::incoming()`] connections. +/// +/// Cloning a [`TcpListener`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. +/// +/// The Transmission Control Protocol is specified in [IETF RFC 793]. +/// +/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 +/// +/// # Examples +/// +/// ```no_run +/// use async_net::TcpListener; +/// use futures::prelude::*; +/// +/// # blocking::block_on(async { +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// let mut incoming = listener.incoming(); +/// +/// while let Some(stream) = incoming.next().await { +/// let mut stream = stream?; +/// stream.write_all(b"hello").await?; +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct TcpListener(Arc>); + +impl TcpListener { + /// Creates a new [`TcpListener`] bound to the given address. + /// + /// Binding with a port number of 0 will request that the operating system assigns an available + /// port to this listener. The assigned port can be queried via the + /// [`local_addr()`][`TcpListener::local_addr()`] method. + /// + /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses + /// until one succeeds and returns the listener. If none of the addresses succeed in creating a + /// listener, the error from the last attempt is returned. + /// + /// # Examples + /// + /// Create a TCP listener bound to `127.0.0.1:80`: + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # blocking::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:80").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + /// + /// Create a TCP listener bound to `127.0.0.1:80`. If that address is unavailable, then try + /// binding to `127.0.0.1:443`: + /// + /// ```no_run + /// use async_net::TcpListener; + /// use std::net::SocketAddr; + /// + /// # blocking::block_on(async { + /// let addrs = [ + /// SocketAddr::from(([127, 0, 0, 1], 80)), + /// SocketAddr::from(([127, 0, 0, 1], 443)), + /// ]; + /// let listener = TcpListener::bind(&addrs[..]).await.unwrap(); + /// # std::io::Result::Ok(()) }); + pub async fn bind(addr: A) -> io::Result { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match Async::::bind(addr) { + Ok(listener) => return Ok(TcpListener(Arc::new(listener))), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any of the addresses", + ) + })) + } + + /// Returns the local address this listener is bound to. + /// + /// # Examples + /// + /// Bind to port 0 and then see which port was assigned by the operating system: + /// + /// ```no_run + /// use async_net::TcpListener; + /// use std::net::SocketAddr; + /// + /// # blocking::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// println!("Listening on {}", listener.local_addr()?); + /// # std::io::Result::Ok(()) }); + pub fn local_addr(&self) -> io::Result { + self.0.get_ref().local_addr() + } + + /// Accepts a new incoming connection. + /// + /// Returns a TCP stream and the address it is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # blocking::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let (stream, addr) = listener.accept().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + let (stream, addr) = self.0.accept().await?; + let stream = TcpStream(Arc::new(stream)); + Ok((stream, addr)) + } + + /// Returns a stream of incoming connections. + /// + /// Iterating over this stream is equivalent to calling [`accept()`][`TcpListener::accept()`] + /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will + /// never result in [`None`]. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// use futures::prelude::*; + /// + /// # blocking::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello").await?; + /// } + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn incoming(&self) -> Incoming<'_> { + Incoming(self) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # blocking::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:80").await?; + /// listener.set_ttl(100)?; + /// assert_eq!(listener.ttl()?, 100); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ttl(&self) -> io::Result { + self.0.get_ref().ttl() + } + + /// Sets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpListener; + /// + /// # blocking::block_on(async { + /// let listener = TcpListener::bind("127.0.0.1:80").await?; + /// listener.set_ttl(100)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.0.get_ref().set_ttl(ttl) + } +} + +#[cfg(unix)] +impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for TcpListener { + fn as_raw_socket(&self) -> RawSocket { + self.0.as_raw_socket() + } +} + +/// A stream of incoming TCP connections. +/// +/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is +/// created by the [`TcpListener::incoming()`] method. +#[derive(Debug)] +pub struct Incoming<'a>(&'a TcpListener); + +impl<'a> Stream for Incoming<'a> { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let future = self.0.accept(); + futures_util::pin_mut!(future); + + let (socket, _) = futures_util::ready!(future.poll(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + +/// A TCP connection. +/// +/// A [`TcpStream`] can be created by [`connect`][`TcpStream::connect()`]ing to an endpoint or by +/// [`accept`][`TcpListener::accept()`]ing an incoming connection. +/// +/// [`TcpStream`] is a bidirectional stream that implements traits [`AsyncRead`] and +/// [`AsyncWrite`]. +/// +/// Cloning a [`TcpStream`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. The reading and writing portions of the connection can also +/// be shut down individually with the [`shutdown()`][`TcpStream::shutdown()`] method. +/// +/// The Transmission Control Protocol is specified in [IETF RFC 793]. +/// +/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 +/// +/// # Examples +/// +/// ```no_run +/// use async_net::TcpStream; +/// use futures::prelude::*; +/// +/// # blocking::block_on(async { +/// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; +/// stream.write_all(b"hello").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let n = stream.read(&mut buf).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct TcpStream(Arc>); + +impl TcpStream { + /// Creates a TCP connection to the specified address. + /// + /// This method will create a new TCP socket and attempt to connect it to the provided `addr`, + /// + /// If `addr` yields multiple addresses, connecting will be attempted with each of the + /// addresses until connecting to one succeeds. If none of the addresses result in a successful + /// connection, the error from the last connect attempt is returned. + /// + /// # Examples + /// + /// Connect to `example.com:80`: + /// + /// ``` + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("example.com:80").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + /// + /// Connect to `127.0.0.1:8080`. If that fails, then try connecting to `127.0.0.1:8081`: + /// + /// ```no_run + /// use async_net::TcpStream; + /// use std::net::SocketAddr; + /// + /// # blocking::block_on(async { + /// let addrs = [ + /// SocketAddr::from(([127, 0, 0, 1], 8080)), + /// SocketAddr::from(([127, 0, 0, 1], 8081)), + /// ]; + /// let stream = TcpStream::connect(&addrs[..]).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn connect(addr: A) -> io::Result { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match Async::::connect(addr).await { + Ok(stream) => return Ok(TcpStream(Arc::new(stream))), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not connect to any of the addresses", + ) + })) + } + + /// Returns the local address this stream is bound to. + /// + /// # Examples + /// + /// ``` + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("example.com:80").await?; + /// println!("Local address is {}", stream.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result { + self.0.get_ref().local_addr() + } + + /// Returns the remote address this stream is connected to. + /// + /// # Examples + /// + /// ``` + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("example.com:80").await?; + /// println!("Connected to {}", stream.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result { + self.0.get_ref().peer_addr() + } + + /// Shuts down the read half, write half, or both halves of this connection. + /// + /// This method will cause all pending and future I/O in the given directions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// use std::net::Shutdown; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// stream.shutdown(Shutdown::Both)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> { + self.0.get_ref().shutdown(how) + } + + /// Receives data without removing it from the queue. + /// + /// On success, returns the number of bytes peeked. + /// + /// Successive calls return the same data. This is accomplished by passing `MSG_PEEK` as a flag + /// to the underlying `recv` system call. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0; 1024]; + /// let n = stream.peek(&mut buf).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn peek(&self, buf: &mut [u8]) -> io::Result { + self.0.peek(buf).await + } + + /// Gets the value of the `TCP_NODELAY` option for this socket. + /// + /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that + /// written data is always sent as soon as possible, even if there is only a small amount of + /// it. + /// + /// When set to `false`, written data is buffered until there is a certain amount to send out, + /// thereby avoiding the frequent sending of small packets. + /// + /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// println!("TCP_NODELAY is set to {}", stream.nodelay()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn nodelay(&self) -> io::Result { + self.0.get_ref().nodelay() + } + + /// Sets the value of the `TCP_NODELAY` option for this socket. + /// + /// If set to `true`, this option disables the [Nagle algorithm][nagle-wiki]. This means that + /// written data is always sent as soon as possible, even if there is only a small amount of + /// it. + /// + /// When set to `false`, written data is buffered until there is a certain amount to send out, + /// thereby avoiding the frequent sending of small packets. + /// + /// [nagle-wiki]: https://en.wikipedia.org/wiki/Nagle%27s_algorithm + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// stream.set_nodelay(false)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { + self.0.get_ref().set_nodelay(nodelay) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// println!("IP_TTL is set to {}", stream.ttl()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ttl(&self) -> io::Result { + self.0.get_ref().ttl() + } + + /// Sets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::TcpStream; + /// + /// # blocking::block_on(async { + /// let stream = TcpStream::connect("127.0.0.1:8080").await?; + /// stream.set_ttl(100)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.0.get_ref().set_ttl(ttl) + } +} + +#[cfg(unix)] +impl AsRawFd for TcpStream { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for TcpStream { + fn as_raw_socket(&self) -> RawSocket { + self.0.as_raw_socket() + } +} + +impl AsyncRead for TcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut &*self).poll_read(cx, buf) + } + + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut &*self).poll_read_vectored(cx, bufs) + } +} + +impl AsyncRead for &TcpStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut &*self.0).poll_read(cx, buf) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut &*self).poll_write(cx, buf) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut &*self).poll_write_vectored(cx, bufs) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self).poll_close(cx) + } +} + +impl AsyncWrite for &TcpStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut &*self.0).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self.0).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self.0).poll_close(cx) + } +} diff --git a/src/udp.rs b/src/udp.rs new file mode 100644 index 0000000..ac66741 --- /dev/null +++ b/src/udp.rs @@ -0,0 +1,596 @@ +use std::io; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, RawSocket}; +use std::sync::Arc; + +use async_io::Async; + +use crate::addr::AsyncToSocketAddrs; + +/// A UDP socket. +/// +/// After creating a [`UdpSocket`] by [`bind`][`UdpSocket::bind()`]ing it to a socket address, data +/// can be [sent to] and [received from] any other socket address. +/// +/// Cloning a [`UdpSocket`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. +/// +/// Although UDP is a connectionless protocol, this implementation provides an interface to set an +/// address where data should be sent and received from. After setting a remote address with +/// [`connect()`][`UdpSocket::connect()`], data can be sent to and received from that address with +/// [`send()`][`UdpSocket::send()`] and [`recv()`][`UdpSocket::recv()`]. +/// +/// As stated in the User Datagram Protocol's specification in [IETF RFC 768], UDP is an unordered, +/// unreliable protocol. Refer to [`TcpListener`][`crate::TcpListener`] and +/// [`TcpStream`][`crate::TcpStream`] for TCP primitives. +/// +/// [received from]: UdpSocket::recv_from() +/// [sent to]: UdpSocket::send_to() +/// [IETF RFC 768]: https://tools.ietf.org/html/rfc768 +/// +/// # Examples +/// +/// ```no_run +/// use async_net::UdpSocket; +/// +/// # blocking::block_on(async { +/// let socket = UdpSocket::bind("127.0.0.1:8080").await?; +/// let mut buf = vec![0u8; 20]; +/// +/// loop { +/// // Receive a single datagram message. +/// // If `buf` is too small to hold the entire message, it will be cut off. +/// let (n, addr) = socket.recv_from(&mut buf).await?; +/// +/// // Send the message back to the same address that has sent it. +/// socket.send_to(&buf[..n], &addr).await?; +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UdpSocket(Arc>); + +impl UdpSocket { + /// Creates a new [`UdpSocket`] bound to the given address. + /// + /// Binding with a port number of 0 will request that the operating system assigns an available + /// port to this socket. The assigned port can be queried via the + /// [`local_addr()`][`UdpSocket::local_addr()`] method. + /// + /// If `addr` yields multiple addresses, binding will be attempted with each of the addresses + /// until one succeeds and returns the socket. If none of the addresses succeed in creating a + /// socket, the error from the last attempt is returned. + /// + /// # Examples + /// + /// Create a UDP socket bound to `127.0.0.1:3400`: + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:3400").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + /// + /// Create a UDP socket bound to `127.0.0.1:3400`. If that address is unavailable, then try + /// binding to `127.0.0.1:3401`: + /// + /// ```no_run + /// use async_net::UdpSocket; + /// use std::net::SocketAddr; + /// + /// # blocking::block_on(async { + /// let addrs = [ + /// SocketAddr::from(([127, 0, 0, 1], 3400)), + /// SocketAddr::from(([127, 0, 0, 1], 3401)), + /// ]; + /// let socket = UdpSocket::bind(&addrs[..]).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn bind(addr: A) -> io::Result { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match Async::::bind(addr) { + Ok(socket) => return Ok(UdpSocket(Arc::new(socket))), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not bind to any of the addresses", + ) + })) + } + + /// Returns the local address this socket is bound to. + /// + /// This can be useful, for example, when binding to port 0 to figure out which port was + /// actually bound. + /// + /// # Examples + /// + /// Bind to port 0 and then see which port was assigned by the operating system: + /// + /// ```no_run + /// use async_net::UdpSocket; + /// use std::net::SocketAddr; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:0").await?; + /// println!("Bound to {}", socket.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result { + self.0.get_ref().local_addr() + } + + /// Returns the remote address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("192.168.0.1:41203").await?; + /// println!("Connected to {}", socket.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result { + self.0.get_ref().peer_addr() + } + + /// Connects the UDP socket to an address. + /// + /// When connected, methods [`send()`][`UdpSocket::send()`] and [`recv()`][`UdpSocket::recv()`] + /// will use the specified address for sending and receiving messages. Additionally, a filter + /// will be applied to [`recv_from()`][`UdpSocket::recv_from()`] so that it only receives + /// messages from that same address. + /// + /// If `addr` yields multiple addresses, connecting will be attempted with each of the + /// addresses until the operating system accepts one. If none of the addresses are accepted, + /// the error from the last attempt is returned. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:3400").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn connect(&self, addr: A) -> io::Result<()> { + let mut last_err = None; + + for addr in addr.to_socket_addrs().await? { + match self.0.get_ref().connect(addr) { + Ok(()) => return Ok(()), + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not connect to any of the addresses", + ) + })) + } + + /// Receives a single datagram message. + /// + /// On success, returns the number of bytes received and the address message came from. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let (n, addr) = socket.recv_from(&mut buf).await?; + /// println!("Received {} bytes from {}", n, addr); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.0.recv_from(buf).await + } + + /// Receives a single datagram message without removing it from the queue. + /// + /// On success, returns the number of bytes peeked and the address message came from. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// Successive calls return the same message. This is accomplished by passing `MSG_PEEK` as a + /// flag to the underlying `recvfrom` system call. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let (n, addr) = socket.peek_from(&mut buf).await?; + /// println!("Peeked {} bytes from {}", n, addr); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.0.get_ref().peek_from(buf) + } + + /// Sends data to the given address. + /// + /// On success, returns the number of bytes sent. + /// + /// If `addr` yields multiple addresses, the message will only be sent to the first address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.send_to(b"hello", "127.0.0.1:4242").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send_to(&self, buf: &[u8], addr: A) -> io::Result { + let addr = match addr.to_socket_addrs().await?.next() { + Some(addr) => addr, + None => { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "no addresses to send data to", + )) + } + }; + + self.0.send_to(buf, addr).await + } + + /// Receives a single datagram message from the connected address. + /// + /// On success, returns the number of bytes received. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// The [`connect()`][`UdpSocket::connect()`] method connects this socket to an address. This + /// method will fail if the socket is not connected. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let n = socket.recv(&mut buf).await?; + /// println!("Received {} bytes", n); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv(&self, buf: &mut [u8]) -> io::Result { + self.0.recv(buf).await + } + + /// Receives a single datagram from the connected address without removing it from the queue. + /// + /// On success, returns the number of bytes peeked. + /// + /// This method must be called with a valid byte buffer of sufficient size to hold a message. + /// If the received message is too long to fit into the buffer, it may be truncated. + /// + /// Successive calls return the same message. This is accomplished by passing `MSG_PEEK` as a + /// flag to the underlying `recv` system call. + /// + /// The [`connect()`][`UdpSocket::connect()`] method connects this socket to an address. This + /// method will fail if the socket is not connected. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// + /// let mut buf = vec![0u8; 1024]; + /// let n = socket.peek(&mut buf).await?; + /// println!("Peeked {} bytes", n); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn peek(&self, buf: &mut [u8]) -> io::Result { + self.0.peek(buf).await + } + + /// Sends data to the connected address. + /// + /// The [`connect()`][`UdpSocket::connect()`] method connects this socket to an address. This + /// method will fail if the socket is not connected. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// socket.send(b"hello").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send(&self, buf: &[u8]) -> io::Result { + self.0.send(buf).await + } + + /// Gets the value of the `SO_BROADCAST` option for this socket. + /// + /// If set to `true`, this socket is allowed to send packets to a broadcast address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("SO_BROADCAST is set to {}", socket.broadcast()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn broadcast(&self) -> io::Result { + self.0.get_ref().broadcast() + } + + /// Sets the value of the `SO_BROADCAST` option for this socket. + /// + /// If set to `true`, this socket is allowed to send packets to a broadcast address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_broadcast(true)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_broadcast(&self, broadcast: bool) -> io::Result<()> { + self.0.get_ref().set_broadcast(broadcast) + } + + /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If set to `true`, multicast packets will be looped back to the local socket. + /// + /// Note that this option may not have any affect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IP_MULTICAST_LOOP is set to {}", socket.multicast_loop_v4()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn multicast_loop_v4(&self) -> io::Result { + self.0.get_ref().multicast_loop_v4() + } + + /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. + /// + /// If set to `true`, multicast packets will be looped back to the local socket. + /// + /// Note that this option may not have any affect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_multicast_loop_v4(true)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_multicast_loop_v4(&self, multicast_loop_v4: bool) -> io::Result<()> { + self.0.get_ref().set_multicast_loop_v4(multicast_loop_v4) + } + + /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for this socket. The default + /// value is 1, which means that multicast packets don't leave the local network unless + /// explicitly requested. + /// + /// Note that this option may not have any effect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IP_MULTICAST_TTL is set to {}", socket.multicast_loop_v4()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn multicast_ttl_v4(&self) -> io::Result { + self.0.get_ref().multicast_ttl_v4() + } + + /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. + /// + /// Indicates the time-to-live value of outgoing multicast packets for this socket. The default + /// value is 1, which means that multicast packets don't leave the local network unless + /// explicitly requested. + /// + /// Note that this option may not have any effect on IPv6 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_multicast_ttl_v4(10)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { + self.0.get_ref().set_multicast_ttl_v4(ttl) + } + + /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// + /// Note that this option may not have any effect on IPv4 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IPV6_MULTICAST_LOOP is set to {}", socket.multicast_loop_v6()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn multicast_loop_v6(&self) -> io::Result { + self.0.get_ref().multicast_loop_v6() + } + + /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. + /// + /// Controls whether this socket sees the multicast packets it sends itself. + /// + /// Note that this option may not have any effect on IPv4 sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_multicast_loop_v6(true)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_multicast_loop_v6(&self, multicast_loop_v6: bool) -> io::Result<()> { + self.0.get_ref().set_multicast_loop_v6(multicast_loop_v6) + } + + /// Gets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// println!("IP_TTL is set to {}", socket.ttl()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn ttl(&self) -> io::Result { + self.0.get_ref().ttl() + } + + /// Sets the value of the `IP_TTL` option for this socket. + /// + /// This option configures the time-to-live field that is used in every packet sent from this + /// socket. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::UdpSocket; + /// + /// # blocking::block_on(async { + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.set_ttl(100)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { + self.0.get_ref().set_ttl(ttl) + } + + /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. + /// + /// This method specifies a new multicast group for this socket to join. Argument `multiaddr` + /// must be a valid multicast address, and `interface` is the address of the local interface + /// with which the system should join the multicast group. If it's equal to `INADDR_ANY` then + /// an appropriate interface is chosen by the system. + pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { + self.0.get_ref().join_multicast_v4(&multiaddr, &interface) + } + + /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. + /// + /// This method leaves a multicast group. Argument `multiaddr` must be a valid multicast + /// address, and `interface` is the index of the interface to leave. + pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { + self.0.get_ref().leave_multicast_v4(&multiaddr, &interface) + } + + /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. + /// + /// This method specifies a new multicast group for this socket to join. Argument `multiaddr` + /// must be a valid multicast address, and `interface` is the index of the interface to join + /// (or 0 to indicate any interface). + pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.0.get_ref().join_multicast_v6(multiaddr, interface) + } + + /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. + /// + /// This method leaves a multicast group. Argument `multiaddr` must be a valid multicast + /// address, and `interface` is the index of the interface to leave. + pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { + self.0.get_ref().leave_multicast_v6(multiaddr, interface) + } +} + +#[cfg(unix)] +impl AsRawFd for UdpSocket { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for UdpSocket { + fn as_raw_socket(&self) -> RawSocket { + self.0.as_raw_socket() + } +} diff --git a/src/unix.rs b/src/unix.rs new file mode 100644 index 0000000..f92cb5f --- /dev/null +++ b/src/unix.rs @@ -0,0 +1,605 @@ +//! Unix domain sockets. +//! +//! This module is an async version of [`std::os::unix::net`]. + +use std::future::Future; +use std::io; +use std::net::Shutdown; +#[cfg(unix)] +use std::os::unix::io::{AsRawFd, RawFd}; +use std::os::unix::net::SocketAddr; +#[cfg(windows)] +use std::os::windows::io::{AsRawSocket, RawSocket}; +use std::path::Path; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use async_io::Async; +use futures_util::io::{AsyncRead, AsyncWrite}; +use futures_util::stream::Stream; + +/// A Unix server, listening for connections. +/// +/// After creating a [`UnixListener`] by [`bind`][`UnixListener::bind()`]ing it to an address, it +/// listens for incoming connections. These can be accepted by calling +/// [`accept()`][`UnixListener::accept()`] or by awaiting items from the async stream of +/// [`incoming`][`UnixListener::incoming()`] connections. +/// +/// Cloning a [`UnixListener`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. +/// +/// # Examples +/// +/// ```no_run +/// use async_net::unix::UnixListener; +/// use futures::prelude::*; +/// +/// # blocking::block_on(async { +/// let listener = UnixListener::bind("/tmp/socket")?; +/// let mut incoming = listener.incoming(); +/// +/// while let Some(stream) = incoming.next().await { +/// let mut stream = stream?; +/// stream.write_all(b"hello").await?; +/// } +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UnixListener(Arc>); + +impl UnixListener { + /// Creates a new [`UnixListener`] bound to the given path. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// use futures::prelude::*; + /// + /// # blocking::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello").await?; + /// } + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn bind>(path: P) -> io::Result { + let path = path.as_ref().to_owned(); + let listener = Async::::bind(path)?; + Ok(UnixListener(Arc::new(listener))) + } + + /// Accepts a new incoming connection. + /// + /// Returns a TCP stream and the address it is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// + /// # blocking::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// let (stream, addr) = listener.accept().await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + let (stream, addr) = self.0.accept().await?; + Ok((UnixStream(Arc::new(stream)), addr)) + } + + /// Returns a stream of incoming connections. + /// + /// Iterating over this stream is equivalent to calling [`accept()`][`UnixListener::accept()`] + /// in a loop. The stream of connections is infinite, i.e awaiting the next connection will + /// never result in [`None`]. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// use futures::prelude::*; + /// + /// # blocking::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello").await?; + /// } + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn incoming(&self) -> Incoming<'_> { + Incoming(self) + } + + /// Returns the local address this listener is bound to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixListener; + /// + /// # blocking::block_on(async { + /// let listener = UnixListener::bind("/tmp/socket")?; + /// println!("Local address is {:?}", listener.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result { + self.0.get_ref().local_addr() + } +} + +#[cfg(unix)] +impl AsRawFd for UnixListener { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for UnixListener { + fn as_raw_socket(&self) -> RawSocket { + self.0.as_raw_socket() + } +} + +/// A stream of incoming Unix connections. +/// +/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is +/// created by the [`UnixListener::incoming()`] method. +#[derive(Debug)] +pub struct Incoming<'a>(&'a UnixListener); + +impl Stream for Incoming<'_> { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let future = self.0.accept(); + futures_util::pin_mut!(future); + + let (socket, _) = futures_util::ready!(future.poll(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + +/// A Unix connection. +/// +/// A [`UnixStream`] can be created by [`connect`][`UnixStream::connect()`]ing to an endpoint or by +/// [`accept`][`UnixListener::accept()`]ing an incoming connection. +/// +/// [`UnixStream`] is a bidirectional stream that implements traits [`AsyncRead`] and +/// [`AsyncWrite`]. +/// +/// Cloning a [`UnixStream`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. The reading and writing portions of the connection can also +/// be shut down individually with the [`shutdown()`][`UnixStream::shutdown()`] method. +/// +/// # Examples +/// +/// ```no_run +/// use async_net::unix::UnixStream; +/// use futures::prelude::*; +/// +/// # blocking::block_on(async { +/// let mut stream = UnixStream::connect("/tmp/socket").await?; +/// stream.write_all(b"hello").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let n = stream.read(&mut buf).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UnixStream(Arc>); + +impl UnixStream { + /// Creates a Unix connection to given path. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # blocking::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn connect>(path: P) -> io::Result { + let path = path.as_ref().to_owned(); + let stream = Arc::new(Async::::connect(path).await?); + Ok(UnixStream(stream)) + } + + /// Creates a pair of connected Unix sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # blocking::block_on(async { + /// let (stream1, stream2) = UnixStream::pair()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = Async::::pair()?; + let a = UnixStream(Arc::new(a)); + let b = UnixStream(Arc::new(b)); + Ok((a, b)) + } + + /// Returns the local address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # blocking::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// println!("Local address is {:?}", stream.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result { + self.0.get_ref().local_addr() + } + + /// Returns the remote address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// + /// # blocking::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// println!("Connected to {:?}", stream.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result { + self.0.get_ref().peer_addr() + } + + /// Shuts down the read half, write half, or both halves of this connection. + /// + /// This method will cause all pending and future I/O in the given directions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// ```no_run + /// use async_net::unix::UnixStream; + /// use std::net::Shutdown; + /// + /// # blocking::block_on(async { + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// stream.shutdown(Shutdown::Both)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.0.get_ref().shutdown(how) + } +} + +#[cfg(unix)] +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for UnixStream { + fn as_raw_socket(&self) -> RawSocket { + self.0.as_raw_socket() + } +} + +impl AsyncRead for UnixStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut &*self).poll_read(cx, buf) + } +} + +impl AsyncRead for &UnixStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut &*self.0).poll_read(cx, buf) + } +} + +impl AsyncWrite for UnixStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut &*self).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self).poll_close(cx) + } +} + +impl AsyncWrite for &UnixStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut &*self.0).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self.0).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self.0).poll_close(cx) + } +} + +/// A Unix datagram socket. +/// +/// After creating a [`UnixDatagram`] by [`bind`][`UnixDatagram::bind()`]ing it to a path, data can +/// be [sent to] and [received from] any other socket address. +/// +/// Cloning a [`UnixDatagram`] creates another handle to the same socket. The socket will be closed +/// when all handles to it are dropped. The reading and writing portions of the socket can also be +/// shut down individually with the [`shutdown()`][`UnixStream::shutdown()`] method. +/// +/// [received from]: UnixDatagram::recv_from() +/// [sent to]: UnixDatagram::send_to() +/// +/// # Examples +/// +/// ```no_run +/// use async_net::unix::UnixDatagram; +/// +/// # blocking::block_on(async { +/// let socket = UnixDatagram::bind("/tmp/socket1")?; +/// socket.send_to(b"hello", "/tmp/socket2").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let (n, addr) = socket.recv_from(&mut buf).await?; +/// # std::io::Result::Ok(()) }); +/// ``` +#[derive(Clone, Debug)] +pub struct UnixDatagram(Arc>); + +impl UnixDatagram { + /// Creates a new [`UnixDatagram`] bound to the given address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::bind("/tmp/socket")?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn bind>(path: P) -> io::Result { + let path = path.as_ref().to_owned(); + let socket = Async::::bind(path)?; + Ok(UnixDatagram(Arc::new(socket))) + } + + /// Creates a Unix datagram socket not bound to any address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn unbound() -> io::Result { + let socket = std::os::unix::net::UnixDatagram::unbound()?; + Ok(UnixDatagram(Arc::new(Async::new(socket)?))) + } + + /// Creates a pair of connected Unix datagram sockets. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let (socket1, socket2) = UnixDatagram::pair()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = std::os::unix::net::UnixDatagram::pair()?; + let a = UnixDatagram(Arc::new(Async::new(a)?)); + let b = UnixDatagram(Arc::new(Async::new(b)?)); + Ok((a, b)) + } + + /// Connects the Unix datagram socket to the given address. + /// + /// When connected, methods [`send()`][`UnixDatagram::send()`] and + /// [`recv()`][`UnixDatagram::recv()`] will use the specified address for sending and receiving + /// messages. Additionally, a filter will be applied to + /// [`recv_from()`][`UnixDatagram::recv_from()`] so that it only receives messages from that + /// same address. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn connect>(&self, path: P) -> io::Result<()> { + let p = path.as_ref(); + self.0.get_ref().connect(p) + } + + /// Returns the local address this socket is bound to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::bind("/tmp/socket")?; + /// println!("Bound to {:?}", socket.local_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn local_addr(&self) -> io::Result { + self.0.get_ref().local_addr() + } + + /// Returns the remote address this socket is connected to. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// println!("Connected to {:?}", socket.peer_addr()?); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn peer_addr(&self) -> io::Result { + self.0.get_ref().peer_addr() + } + + /// Receives data from an address. + /// + /// On success, returns the number of bytes received and the address data came from. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::bind("/tmp/socket")?; + /// + /// let mut buf = vec![0; 1024]; + /// let (n, addr) = socket.recv_from(&mut buf).await?; + /// println!("Received {} bytes from {:?}", n, addr); + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + self.0.recv_from(buf).await + } + + /// Sends data to the given address. + /// + /// On success, returns the number of bytes sent. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.send_to(b"hello", "/tmp/socket").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send_to>(&self, buf: &[u8], path: P) -> io::Result { + self.0.send_to(buf, path.as_ref()).await + } + + /// Receives data from the connected address. + /// + /// On success, returns the number of bytes received. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// + /// let mut buf = vec![0; 1024]; + /// let n = socket.recv(&mut buf).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn recv(&self, buf: &mut [u8]) -> io::Result { + self.0.recv(buf).await + } + + /// Sends data to the connected address. + /// + /// On success, returns the number of bytes sent. + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket")?; + /// socket.send(b"hello").await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn send(&self, buf: &[u8]) -> io::Result { + self.0.send(buf).await + } + + /// Shuts down the read half, write half, or both halves of this socket. + /// + /// This method will cause all pending and future I/O in the given directions to return + /// immediately with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// # Examples + /// + /// ```no_run + /// use async_net::unix::UnixDatagram; + /// use std::net::Shutdown; + /// + /// # blocking::block_on(async { + /// let socket = UnixDatagram::unbound()?; + /// socket.shutdown(Shutdown::Both)?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.0.get_ref().shutdown(how) + } +} + +#[cfg(unix)] +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.0.as_raw_fd() + } +} + +#[cfg(windows)] +impl AsRawSocket for UnixDatagram { + fn as_raw_socket(&self) -> RawSocket { + self.0.as_raw_socket() + } +}