From aea26b322c9493f732894265810837bf17a330ca Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Jan 2022 10:53:45 -0800 Subject: [PATCH] Revert "Update mio to 0.8 (#4270)" and dependent changes (#4392) This reverts commits: * ee0e811a362e4aeb8f47cb530cace2d352fb4b8a * 49a9dc6743a8d90c46a51a42706943acf39a5d85 * 0190831ec1922047751b6d40554cc4a11cf2a82c * 43cdb2cb5004a68d28c4394664b9f9964f3d59e2 * 96370ba4ce9ea5564f094354579d5539af8bbc9d * a9d9bde0688cb88149272d78f8239a89b357974e --- tokio/Cargo.toml | 19 ++--- tokio/src/net/tcp/socket.rs | 126 ++++++------------------------ tokio/src/net/tcp/stream.rs | 26 +++++-- tokio/src/net/udp.rs | 150 ------------------------------------ tokio/tests/udp.rs | 10 --- 5 files changed, 52 insertions(+), 279 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 3945456520b..2a88b766c0a 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -49,19 +49,20 @@ macros = ["tokio-macros"] stats = [] net = [ "libc", - "mio/net", - "mio/os-ext", "mio/os-poll", - "socket2/all", + "mio/os-util", + "mio/tcp", + "mio/udp", + "mio/uds", "winapi/namedpipeapi", ] process = [ "bytes", "once_cell", "libc", - "mio/net", - "mio/os-ext", "mio/os-poll", + "mio/os-util", + "mio/uds", "signal-hook-registry", "winapi/threadpoollegacyapiset", ] @@ -74,9 +75,9 @@ rt-multi-thread = [ signal = [ "once_cell", "libc", - "mio/net", - "mio/os-ext", "mio/os-poll", + "mio/uds", + "mio/os-util", "signal-hook-registry", "winapi/consoleapi", ] @@ -93,10 +94,9 @@ pin-project-lite = "0.2.0" bytes = { version = "1.0.0", optional = true } once_cell = { version = "1.5.2", optional = true } memchr = { version = "2.2", optional = true } -mio = { version = "0.8.0", optional = true } +mio = { version = "0.7.6", optional = true } num_cpus = { version = "1.8.0", optional = true } parking_lot = { version = "0.11.0", optional = true } -socket2 = { version = "0.4.2", optional = true } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. @@ -128,6 +128,7 @@ proptest = "1" rand = "0.8.0" tempfile = "3.1.0" async-stream = "0.3" +socket2 = "0.4" [target.'cfg(target_os = "freebsd")'.dev-dependencies] mio-aio = { version = "0.6.0", features = ["tokio"] } diff --git a/tokio/src/net/tcp/socket.rs b/tokio/src/net/tcp/socket.rs index ee9633611a1..3c6870221c2 100644 --- a/tokio/src/net/tcp/socket.rs +++ b/tokio/src/net/tcp/socket.rs @@ -1,6 +1,5 @@ use crate::net::{TcpListener, TcpStream}; -use std::convert::TryInto; use std::fmt; use std::io; use std::net::SocketAddr; @@ -85,7 +84,7 @@ cfg_net! { /// [`socket2`]: https://docs.rs/socket2/ #[cfg_attr(docsrs, doc(alias = "connect_std"))] pub struct TcpSocket { - inner: socket2::Socket, + inner: mio::net::TcpSocket, } } @@ -120,11 +119,7 @@ impl TcpSocket { /// } /// ``` pub fn new_v4() -> io::Result { - let inner = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::STREAM, - Some(socket2::Protocol::TCP), - )?; + let inner = mio::net::TcpSocket::new_v4()?; Ok(TcpSocket { inner }) } @@ -158,11 +153,7 @@ impl TcpSocket { /// } /// ``` pub fn new_v6() -> io::Result { - let inner = socket2::Socket::new( - socket2::Domain::IPV6, - socket2::Type::STREAM, - Some(socket2::Protocol::TCP), - )?; + let inner = mio::net::TcpSocket::new_v6()?; Ok(TcpSocket { inner }) } @@ -193,7 +184,7 @@ impl TcpSocket { /// } /// ``` pub fn set_reuseaddr(&self, reuseaddr: bool) -> io::Result<()> { - self.inner.set_reuse_address(reuseaddr) + self.inner.set_reuseaddr(reuseaddr) } /// Retrieves the value set for `SO_REUSEADDR` on this socket. @@ -219,7 +210,7 @@ impl TcpSocket { /// } /// ``` pub fn reuseaddr(&self) -> io::Result { - self.inner.reuse_address() + self.inner.get_reuseaddr() } /// Allows the socket to bind to an in-use port. Only available for unix systems @@ -253,7 +244,7 @@ impl TcpSocket { doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))) )] pub fn set_reuseport(&self, reuseport: bool) -> io::Result<()> { - self.inner.set_reuse_port(reuseport) + self.inner.set_reuseport(reuseport) } /// Allows the socket to bind to an in-use port. Only available for unix systems @@ -288,14 +279,14 @@ impl TcpSocket { doc(cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))) )] pub fn reuseport(&self) -> io::Result { - self.inner.reuse_port() + self.inner.get_reuseport() } /// Sets the size of the TCP send buffer on this socket. /// /// On most operating systems, this sets the `SO_SNDBUF` socket option. pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> { - self.inner.set_send_buffer_size(size as usize) + self.inner.set_send_buffer_size(size) } /// Returns the size of the TCP send buffer for this socket. @@ -322,14 +313,14 @@ impl TcpSocket { /// /// [`set_send_buffer_size`]: #method.set_send_buffer_size pub fn send_buffer_size(&self) -> io::Result { - self.inner.send_buffer_size().map(|n| n as u32) + self.inner.get_send_buffer_size() } /// Sets the size of the TCP receive buffer on this socket. /// /// On most operating systems, this sets the `SO_RCVBUF` socket option. pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> { - self.inner.set_recv_buffer_size(size as usize) + self.inner.set_recv_buffer_size(size) } /// Returns the size of the TCP receive buffer for this socket. @@ -356,7 +347,7 @@ impl TcpSocket { /// /// [`set_recv_buffer_size`]: #method.set_recv_buffer_size pub fn recv_buffer_size(&self) -> io::Result { - self.inner.recv_buffer_size().map(|n| n as u32) + self.inner.get_recv_buffer_size() } /// Sets the linger duration of this socket by setting the SO_LINGER option. @@ -378,62 +369,7 @@ impl TcpSocket { /// /// [`set_linger`]: TcpSocket::set_linger pub fn linger(&self) -> io::Result> { - self.inner.linger() - } - - /// Gets the value of the `IP_TOS` option for this socket. - /// - /// For more information about this option, see [`set_tos`]. - /// - /// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or - /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options) - /// - /// [`set_tos`]: Self::set_tos - // https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178 - #[cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))] - #[cfg_attr( - docsrs, - doc(cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))) - )] - pub fn tos(&self) -> io::Result { - self.inner.tos() - } - - /// Sets the value for the `IP_TOS` option on this socket. - /// - /// This value sets the time-to-live field that is used in every packet sent - /// from this socket. - /// - /// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or - /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options) - // https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178 - #[cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))] - #[cfg_attr( - docsrs, - doc(cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))) - )] - pub fn set_tos(&self, tos: u32) -> io::Result<()> { - self.inner.set_tos(tos) + self.inner.get_linger() } /// Gets the local address of this socket. @@ -459,14 +395,7 @@ impl TcpSocket { /// } /// ``` pub fn local_addr(&self) -> io::Result { - self.inner - .local_addr() - .map(|addr| addr.as_socket().unwrap()) - } - - /// Returns the value of the `SO_ERROR` option. - pub fn take_error(&self) -> io::Result> { - self.inner.take_error() + self.inner.get_localaddr() } /// Binds the socket to the given address. @@ -498,7 +427,7 @@ impl TcpSocket { /// } /// ``` pub fn bind(&self, addr: SocketAddr) -> io::Result<()> { - self.inner.bind(&addr.into()) + self.inner.bind(addr) } /// Establishes a TCP connection with a peer at the specified socket address. @@ -534,13 +463,7 @@ impl TcpSocket { /// } /// ``` pub async fn connect(self, addr: SocketAddr) -> io::Result { - self.inner.connect(&addr.into())?; - - #[cfg(windows)] - let mio = unsafe { mio::net::TcpStream::from_raw_socket(self.inner.into_raw_socket()) }; - #[cfg(unix)] - let mio = unsafe { mio::net::TcpStream::from_raw_fd(self.inner.into_raw_fd()) }; - + let mio = self.inner.connect(addr)?; TcpStream::connect_mio(mio).await } @@ -580,14 +503,7 @@ impl TcpSocket { /// } /// ``` pub fn listen(self, backlog: u32) -> io::Result { - let backlog = backlog.try_into().unwrap_or(i32::MAX); - self.inner.listen(backlog)?; - - #[cfg(windows)] - let mio = unsafe { mio::net::TcpListener::from_raw_socket(self.inner.into_raw_socket()) }; - #[cfg(unix)] - let mio = unsafe { mio::net::TcpListener::from_raw_fd(self.inner.into_raw_fd()) }; - + let mio = self.inner.listen(backlog)?; TcpListener::new(mio) } @@ -607,7 +523,7 @@ impl TcpSocket { /// /// #[tokio::main] /// async fn main() -> std::io::Result<()> { - /// + /// /// let socket2_socket = Socket::new(Domain::IPV4, Type::STREAM, None)?; /// /// let socket = TcpSocket::from_std_stream(socket2_socket.into()); @@ -618,12 +534,16 @@ impl TcpSocket { pub fn from_std_stream(std_stream: std::net::TcpStream) -> TcpSocket { #[cfg(unix)] { + use std::os::unix::io::{FromRawFd, IntoRawFd}; + let raw_fd = std_stream.into_raw_fd(); unsafe { TcpSocket::from_raw_fd(raw_fd) } } #[cfg(windows)] { + use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + let raw_socket = std_stream.into_raw_socket(); unsafe { TcpSocket::from_raw_socket(raw_socket) } } @@ -652,7 +572,7 @@ impl FromRawFd for TcpSocket { /// The caller is responsible for ensuring that the socket is in /// non-blocking mode. unsafe fn from_raw_fd(fd: RawFd) -> TcpSocket { - let inner = socket2::Socket::from_raw_fd(fd); + let inner = mio::net::TcpSocket::from_raw_fd(fd); TcpSocket { inner } } } @@ -687,7 +607,7 @@ impl FromRawSocket for TcpSocket { /// The caller is responsible for ensuring that the socket is in /// non-blocking mode. unsafe fn from_raw_socket(socket: RawSocket) -> TcpSocket { - let inner = socket2::Socket::from_raw_socket(socket); + let inner = mio::net::TcpSocket::from_raw_socket(socket); TcpSocket { inner } } } diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index abfc3c6612b..60d20fd74b2 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -387,7 +387,7 @@ impl TcpStream { /// // if the readiness event is a false positive. /// match stream.try_read(&mut data) { /// Ok(n) => { - /// println!("read {} bytes", n); + /// println!("read {} bytes", n); /// } /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { /// continue; @@ -1090,8 +1090,9 @@ impl TcpStream { /// # } /// ``` pub fn linger(&self) -> io::Result> { - let socket = self.as_socket(); - socket.linger() + let mio_socket = std::mem::ManuallyDrop::new(self.to_mio()); + + mio_socket.get_linger() } /// Sets the linger duration of this socket by setting the SO_LINGER option. @@ -1116,12 +1117,23 @@ impl TcpStream { /// # } /// ``` pub fn set_linger(&self, dur: Option) -> io::Result<()> { - let socket = self.as_socket(); - socket.set_linger(dur) + let mio_socket = std::mem::ManuallyDrop::new(self.to_mio()); + + mio_socket.set_linger(dur) } - fn as_socket(&self) -> socket2::SockRef<'_> { - socket2::SockRef::from(self) + fn to_mio(&self) -> mio::net::TcpSocket { + #[cfg(windows)] + { + use std::os::windows::io::{AsRawSocket, FromRawSocket}; + unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) } + } + + #[cfg(unix)] + { + use std::os::unix::io::{AsRawFd, FromRawFd}; + unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) } + } } /// Gets the value of the `IP_TTL` option for this socket. diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index a6d80c6f760..12af5152c28 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -257,78 +257,6 @@ impl UdpSocket { } } - /// Sets the size of the UDP send buffer on this socket. - /// - /// On most operating systems, this sets the `SO_SNDBUF` socket option. - pub fn set_send_buffer_size(&self, size: u32) -> io::Result<()> { - self.as_socket().set_send_buffer_size(size as usize) - } - - /// Returns the size of the UDP send buffer for this socket. - /// - /// On most operating systems, this is the value of the `SO_SNDBUF` socket - /// option. - /// - /// Note that if [`set_send_buffer_size`] has been called on this socket - /// previously, the value returned by this function may not be the same as - /// the argument provided to `set_send_buffer_size`. This is for the - /// following reasons: - /// - /// * Most operating systems have minimum and maximum allowed sizes for the - /// send buffer, and will clamp the provided value if it is below the - /// minimum or above the maximum. The minimum and maximum buffer sizes are - /// OS-dependent. - /// * Linux will double the buffer size to account for internal bookkeeping - /// data, and returns the doubled value from `getsockopt(2)`. As per `man - /// 7 socket`: - /// > Sets or gets the maximum socket send buffer in bytes. The - /// > kernel doubles this value (to allow space for bookkeeping - /// > overhead) when it is set using `setsockopt(2)`, and this doubled - /// > value is returned by `getsockopt(2)`. - /// - /// [`set_send_buffer_size`]: Self::set_send_buffer_size - pub fn send_buffer_size(&self) -> io::Result { - self.as_socket().send_buffer_size().map(|n| n as u32) - } - - /// Sets the size of the UDP receive buffer on this socket. - /// - /// On most operating systems, this sets the `SO_RCVBUF` socket option. - pub fn set_recv_buffer_size(&self, size: u32) -> io::Result<()> { - self.as_socket().set_recv_buffer_size(size as usize) - } - - /// Returns the size of the UDP receive buffer for this socket. - /// - /// On most operating systems, this is the value of the `SO_RCVBUF` socket - /// option. - /// - /// Note that if [`set_recv_buffer_size`] has been called on this socket - /// previously, the value returned by this function may not be the same as - /// the argument provided to `set_send_buffer_size`. This is for the - /// following reasons: - /// - /// * Most operating systems have minimum and maximum allowed sizes for the - /// receive buffer, and will clamp the provided value if it is below the - /// minimum or above the maximum. The minimum and maximum buffer sizes are - /// OS-dependent. - /// * Linux will double the buffer size to account for internal bookkeeping - /// data, and returns the doubled value from `getsockopt(2)`. As per `man - /// 7 socket`: - /// > Sets or gets the maximum socket send buffer in bytes. The - /// > kernel doubles this value (to allow space for bookkeeping - /// > overhead) when it is set using `setsockopt(2)`, and this doubled - /// > value is returned by `getsockopt(2)`. - /// - /// [`set_recv_buffer_size`]: Self::set_recv_buffer_size - pub fn recv_buffer_size(&self) -> io::Result { - self.as_socket().recv_buffer_size().map(|n| n as u32) - } - - fn as_socket(&self) -> socket2::SockRef<'_> { - socket2::SockRef::from(self) - } - /// Returns the local address that this socket is bound to. /// /// # Example @@ -350,29 +278,6 @@ impl UdpSocket { self.io.local_addr() } - /// Returns the socket address of the remote peer this socket was connected - /// to. - /// - /// # Example - /// - /// ``` - /// use tokio::net::UdpSocket; - /// # use std::{io, net::SocketAddr}; - /// - /// # #[tokio::main] - /// # async fn main() -> io::Result<()> { - /// let addr = "127.0.0.1:0".parse::().unwrap(); - /// let peer_addr = "127.0.0.1:11100".parse::().unwrap(); - /// let sock = UdpSocket::bind(addr).await?; - /// sock.connect(peer_addr).await?; - /// assert_eq!(sock.peer_addr()?.ip(), peer_addr.ip()); - /// # Ok(()) - /// # } - /// ``` - pub fn peer_addr(&self) -> io::Result { - self.io.peer_addr() - } - /// Connects the UDP socket setting the default destination for send() and /// limiting packets that are read via recv from the address specified in /// `addr`. @@ -1579,61 +1484,6 @@ impl UdpSocket { self.io.set_ttl(ttl) } - /// Gets the value of the `IP_TOS` option for this socket. - /// - /// For more information about this option, see [`set_tos`]. - /// - /// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or - /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options) - /// - /// [`set_tos`]: Self::set_tos - // https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178 - #[cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))] - #[cfg_attr( - docsrs, - doc(cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))) - )] - pub fn tos(&self) -> io::Result { - self.as_socket().tos() - } - - /// Sets the value for the `IP_TOS` option on this socket. - /// - /// This value sets the time-to-live field that is used in every packet sent - /// from this socket. - /// - /// **NOTE:** On Windows, `IP_TOS` is only supported on [Windows 8+ or - /// Windows Server 2012+.](https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options) - // https://docs.rs/socket2/0.4.2/src/socket2/socket.rs.html#1178 - #[cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))] - #[cfg_attr( - docsrs, - doc(cfg(not(any( - target_os = "fuchsia", - target_os = "redox", - target_os = "solaris", - target_os = "illumos", - )))) - )] - pub fn set_tos(&self, tos: u32) -> io::Result<()> { - self.as_socket().set_tos(tos) - } - /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. /// /// This function specifies a new multicast group for this socket to join. diff --git a/tokio/tests/udp.rs b/tokio/tests/udp.rs index 11a97276c1f..ec2a1e96104 100644 --- a/tokio/tests/udp.rs +++ b/tokio/tests/udp.rs @@ -3,7 +3,6 @@ use futures::future::poll_fn; use std::io; -use std::net::SocketAddr; use std::sync::Arc; use tokio::{io::ReadBuf, net::UdpSocket}; use tokio_test::assert_ok; @@ -485,12 +484,3 @@ async fn poll_ready() { } } } - -#[tokio::test] -async fn peer_addr() { - let addr = "127.0.0.1:0".parse::().unwrap(); - let peer_addr = "127.0.0.1:11100".parse::().unwrap(); - let sock = UdpSocket::bind(addr).await.unwrap(); - sock.connect(peer_addr).await.unwrap(); - assert_eq!(sock.peer_addr().unwrap().ip(), peer_addr.ip()); -}