Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add rustfmt.toml #5694

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
chore: add rustfmt.toml
kamuik16 committed Nov 27, 2024
commit 464e31b1c986aa401918b55baa6131c7a454c350
22 changes: 13 additions & 9 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -83,12 +83,14 @@ pub enum ConnectedPoint {
/// connection as a dialer and one peer dial the other and upgrade the
/// connection _as a listener_ overriding its role.
role_override: Endpoint,
/// Whether the port for the outgoing connection was reused from a listener
/// or a new port was allocated. This is useful for address translation.
/// Whether the port for the outgoing connection was reused from a
/// listener or a new port was allocated. This is useful for
/// address translation.
///
/// The port use is implemented on a best-effort basis. It is not guaranteed
/// that [`PortUse::Reuse`] actually reused a port. A good example is the case
/// where there is no listener available to reuse a port from.
/// The port use is implemented on a best-effort basis. It is not
/// guaranteed that [`PortUse::Reuse`] actually reused a port. A
/// good example is the case where there is no listener
/// available to reuse a port from.
port_use: PortUse,
},
/// We received the node.
@@ -153,10 +155,11 @@ impl ConnectedPoint {

/// Returns the address of the remote stored in this struct.
///
/// For `Dialer`, this returns `address`. For `Listener`, this returns `send_back_addr`.
/// For `Dialer`, this returns `address`. For `Listener`, this returns
/// `send_back_addr`.
///
/// Note that the remote node might not be listening on this address and hence the address might
/// not be usable to establish new connections.
/// Note that the remote node might not be listening on this address and
/// hence the address might not be usable to establish new connections.
pub fn get_remote_address(&self) -> &Multiaddr {
match self {
ConnectedPoint::Dialer { address, .. } => address,
@@ -166,7 +169,8 @@ impl ConnectedPoint {

/// Modifies the address of the remote stored in this struct.
///
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies
/// `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address, .. } => *address = new_address,
20 changes: 12 additions & 8 deletions core/src/either.rs
Original file line number Diff line number Diff line change
@@ -18,17 +18,20 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::StreamMuxerEvent;
use crate::transport::DialOpts;
use crate::{
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr,
use std::{
pin::Pin,
task::{Context, Poll},
};

use either::Either;
use futures::prelude::*;
use pin_project::pin_project;
use std::{pin::Pin, task::Context, task::Poll};

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
Multiaddr,
};

impl<A, B> StreamMuxer for future::Either<A, B>
where
@@ -88,7 +91,8 @@ where
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
/// Implements `Future` and dispatches all method calls to either `First` or
/// `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
7 changes: 4 additions & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -28,16 +28,17 @@
//! to a remote and can subdivide this connection into multiple substreams.
//! See the [`muxing`] module.
//! - The [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] traits
//! define how to upgrade each individual substream to use a protocol.
//! See the `upgrade` module.
//! define how to upgrade each individual substream to use a protocol. See the
//! `upgrade` module.
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod proto {
#![allow(unreachable_pub)]
include!("generated/mod.rs");
pub use self::{
envelope_proto::*, peer_record_proto::mod_PeerRecord::*, peer_record_proto::PeerRecord,
envelope_proto::*,
peer_record_proto::{mod_PeerRecord::*, PeerRecord},
};
}

112 changes: 66 additions & 46 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
@@ -20,63 +20,76 @@

//! Muxing is the process of splitting a connection into multiple substreams.
//!
//! The main item of this module is the `StreamMuxer` trait. An implementation of `StreamMuxer`
//! has ownership of a connection, lets you open and close substreams.
//! The main item of this module is the `StreamMuxer` trait. An implementation
//! of `StreamMuxer` has ownership of a connection, lets you open and close
//! substreams.
//!
//! > **Note**: You normally don't need to use the methods of the `StreamMuxer` directly, as this
//! > is managed by the library's internals.
//! > **Note**: You normally don't need to use the methods of the `StreamMuxer`
//! > directly, as this
//! > is managed by the library's internals.
//!
//! Each substream of a connection is an isolated stream of data. All the substreams are muxed
//! together so that the data read from or written to each substream doesn't influence the other
//! substreams.
//! Each substream of a connection is an isolated stream of data. All the
//! substreams are muxed together so that the data read from or written to each
//! substream doesn't influence the other substreams.
//!
//! In the context of libp2p, each substream can use a different protocol. Contrary to opening a
//! connection, opening a substream is almost free in terms of resources. This means that you
//! shouldn't hesitate to rapidly open and close substreams, and to design protocols that don't
//! require maintaining long-lived channels of communication.
//! In the context of libp2p, each substream can use a different protocol.
//! Contrary to opening a connection, opening a substream is almost free in
//! terms of resources. This means that you shouldn't hesitate to rapidly open
//! and close substreams, and to design protocols that don't require maintaining
//! long-lived channels of communication.
//!
//! > **Example**: The Kademlia protocol opens a new substream for each request it wants to
//! > perform. Multiple requests can be performed simultaneously by opening multiple
//! > substreams, without having to worry about associating responses with the
//! > right request.
//! > **Example**: The Kademlia protocol opens a new substream for each request
//! > it wants to
//! > perform. Multiple requests can be performed simultaneously by opening
//! > multiple
//! > substreams, without having to worry about associating responses with the
//! > right request.
//!
//! # Implementing a muxing protocol
//!
//! In order to implement a muxing protocol, create an object that implements the `UpgradeInfo`,
//! `InboundUpgrade` and `OutboundUpgrade` traits. See the `upgrade` module for more information.
//! The `Output` associated type of the `InboundUpgrade` and `OutboundUpgrade` traits should be
//! identical, and should be an object that implements the `StreamMuxer` trait.
//! In order to implement a muxing protocol, create an object that implements
//! the `UpgradeInfo`, `InboundUpgrade` and `OutboundUpgrade` traits. See the
//! `upgrade` module for more information. The `Output` associated type of the
//! `InboundUpgrade` and `OutboundUpgrade` traits should be identical, and
//! should be an object that implements the `StreamMuxer` trait.
//!
//! The upgrade process will take ownership of the connection, which makes it possible for the
//! implementation of `StreamMuxer` to control everything that happens on the wire.
//! The upgrade process will take ownership of the connection, which makes it
//! possible for the implementation of `StreamMuxer` to control everything that
//! happens on the wire.
use std::{future::Future, pin::Pin};

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use futures::{
task::{Context, Poll},
AsyncRead,
AsyncWrite,
};
use multiaddr::Multiaddr;
use std::future::Future;
use std::pin::Pin;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
pub use self::boxed::{StreamMuxerBox, SubstreamBox};

mod boxed;

/// Provides multiplexing for a connection by allowing users to open substreams.
///
/// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`].
/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style
/// functions that allow the implementation to make progress on various tasks.
/// A substream created by a [`StreamMuxer`] is a type that implements
/// [`AsyncRead`] and [`AsyncWrite`]. The [`StreamMuxer`] itself is modelled
/// closely after [`AsyncWrite`]. It features `poll`-style functions that allow
/// the implementation to make progress on various tasks.
pub trait StreamMuxer {
/// Type of the object that represents the raw substream where data can be read and written.
/// Type of the object that represents the raw substream where data can be
/// read and written.
type Substream: AsyncRead + AsyncWrite;

/// Error type of the muxer
type Error: std::error::Error;

/// Poll for new inbound substreams.
///
/// This function should be called whenever callers are ready to accept more inbound streams. In
/// other words, callers may exercise back-pressure on incoming streams by not calling this
/// function if a certain limit is hit.
/// This function should be called whenever callers are ready to accept more
/// inbound streams. In other words, callers may exercise back-pressure
/// on incoming streams by not calling this function if a certain limit
/// is hit.
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -90,20 +103,23 @@ pub trait StreamMuxer {

/// Poll to close this [`StreamMuxer`].
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely
/// dropped.
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become
/// useless and may be safely dropped.
///
/// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
/// > **Note**: You are encouraged to call this method and wait for it to
/// > return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart
/// > from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

/// Poll to allow the underlying connection to make progress.
///
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
/// unconditionally. Because it will be called regardless, this function can be used by
/// implementations to return events about the underlying connection that the caller MUST deal
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this
/// function MUST be called unconditionally. Because it will be called
/// regardless, this function can be used by implementations to return
/// events about the underlying connection that the caller MUST deal
/// with.
fn poll(
self: Pin<&mut Self>,
@@ -120,7 +136,8 @@ pub enum StreamMuxerEvent {

/// Extension trait for [`StreamMuxer`].
pub trait StreamMuxerExt: StreamMuxer + Sized {
/// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll_inbound`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_inbound_unpin(
&mut self,
cx: &mut Context<'_>,
@@ -131,7 +148,8 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_inbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_outbound`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll_outbound`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_outbound_unpin(
&mut self,
cx: &mut Context<'_>,
@@ -142,15 +160,17 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
/// Convenience function for calling [`StreamMuxer::poll_close`] for
/// [`StreamMuxer`]s that are `Unpin`.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
28 changes: 17 additions & 11 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::muxing::{StreamMuxer, StreamMuxerEvent};
use std::{
error::Error,
fmt,
io,
io::{IoSlice, IoSliceMut},
pin::Pin,
task::{Context, Poll},
};

use futures::{AsyncRead, AsyncWrite};
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
use std::io;
use std::io::{IoSlice, IoSliceMut};
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::muxing::{StreamMuxer, StreamMuxerEvent};

/// Abstract `StreamMuxer`.
pub struct StreamMuxerBox {
@@ -21,8 +25,8 @@ impl fmt::Debug for StreamMuxerBox {

/// Abstract type for asynchronous reading and writing.
///
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead`
/// and `AsyncWrite` capabilities.
/// A [`SubstreamBox`] erases the concrete type it is given and only retains its
/// `AsyncRead` and `AsyncWrite` capabilities.
pub struct SubstreamBox(Pin<Box<dyn AsyncReadWrite + Send>>);

#[pin_project]
@@ -139,7 +143,8 @@ impl StreamMuxer for StreamMuxerBox {
}

impl SubstreamBox {
/// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`].
/// Construct a new [`SubstreamBox`] from something that implements
/// [`AsyncRead`] and [`AsyncWrite`].
pub fn new<S: AsyncRead + AsyncWrite + Send + 'static>(stream: S) -> Self {
Self(Box::pin(stream))
}
@@ -155,7 +160,8 @@ impl fmt::Debug for SubstreamBox {
trait AsyncReadWrite: AsyncRead + AsyncWrite {
/// Helper function to capture the erased inner type.
///
/// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful.
/// Used to make the [`Debug`] implementation of [`SubstreamBox`] more
/// useful.
fn type_name(&self) -> &'static str;
}

25 changes: 14 additions & 11 deletions core/src/peer_record.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
use crate::signed_envelope::SignedEnvelope;
use crate::{proto, signed_envelope, DecodeError, Multiaddr};
use libp2p_identity::Keypair;
use libp2p_identity::PeerId;
use libp2p_identity::SigningError;
use libp2p_identity::{Keypair, PeerId, SigningError};
use quick_protobuf::{BytesReader, Writer};
use web_time::SystemTime;

use crate::{proto, signed_envelope, signed_envelope::SignedEnvelope, DecodeError, Multiaddr};

const PAYLOAD_TYPE: &str = "/libp2p/routing-state-record";
const DOMAIN_SEP: &str = "libp2p-routing-state";

/// Represents a peer routing record.
///
/// Peer records are designed to be distributable and carry a signature by being wrapped in a signed envelope.
/// For more information see RFC0003 of the libp2p specifications: <https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md>
/// Peer records are designed to be distributable and carry a signature by being
/// wrapped in a signed envelope. For more information see RFC0003 of the libp2p specifications: <https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md>
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct PeerRecord {
peer_id: PeerId,
@@ -21,14 +19,16 @@ pub struct PeerRecord {

/// A signed envelope representing this [`PeerRecord`].
///
/// If this [`PeerRecord`] was constructed from a [`SignedEnvelope`], this is the original instance.
/// If this [`PeerRecord`] was constructed from a [`SignedEnvelope`], this
/// is the original instance.
envelope: SignedEnvelope,
}

impl PeerRecord {
/// Attempt to re-construct a [`PeerRecord`] from a [`SignedEnvelope`].
///
/// If this function succeeds, the [`SignedEnvelope`] contained a peer record with a valid signature and can hence be considered authenticated.
/// If this function succeeds, the [`SignedEnvelope`] contained a peer
/// record with a valid signature and can hence be considered authenticated.
pub fn from_signed_envelope(envelope: SignedEnvelope) -> Result<Self, FromEnvelopeError> {
use quick_protobuf::MessageRead;

@@ -58,9 +58,12 @@ impl PeerRecord {
})
}

/// Construct a new [`PeerRecord`] by authenticating the provided addresses with the given key.
/// Construct a new [`PeerRecord`] by authenticating the provided addresses
/// with the given key.
///
/// This is the same key that is used for authenticating every libp2p connection of your application, i.e. what you use when setting up your [`crate::transport::Transport`].
/// This is the same key that is used for authenticating every libp2p
/// connection of your application, i.e. what you use when setting up your
/// [`crate::transport::Transport`].
pub fn new(key: &Keypair, addresses: Vec<Multiaddr>) -> Result<Self, SigningError> {
use quick_protobuf::MessageWrite;

39 changes: 25 additions & 14 deletions core/src/signed_envelope.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::{proto, DecodeError};
use libp2p_identity::SigningError;
use libp2p_identity::{Keypair, PublicKey};
use quick_protobuf::{BytesReader, Writer};
use std::fmt;

use libp2p_identity::{Keypair, PublicKey, SigningError};
use quick_protobuf::{BytesReader, Writer};
use unsigned_varint::encode::usize_buffer;

/// A signed envelope contains an arbitrary byte string payload, a signature of the payload, and the public key that can be used to verify the signature.
use crate::{proto, DecodeError};

/// A signed envelope contains an arbitrary byte string payload, a signature of
/// the payload, and the public key that can be used to verify the signature.
///
/// For more details see libp2p RFC0002: <https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md>
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -36,7 +38,8 @@ impl SignedEnvelope {
})
}

/// Verify this [`SignedEnvelope`] against the provided domain-separation string.
/// Verify this [`SignedEnvelope`] against the provided domain-separation
/// string.
#[must_use]
pub fn verify(&self, domain_separation: String) -> bool {
let buffer = signature_payload(domain_separation, &self.payload_type, &self.payload);
@@ -46,8 +49,10 @@ impl SignedEnvelope {

/// Extract the payload and signing key of this [`SignedEnvelope`].
///
/// You must provide the correct domain-separation string and expected payload type in order to get the payload.
/// This guards against accidental mis-use of the payload where the signature was created for a different purpose or payload type.
/// You must provide the correct domain-separation string and expected
/// payload type in order to get the payload. This guards against
/// accidental mis-use of the payload where the signature was created for a
/// different purpose or payload type.
///
/// It is the caller's responsibility to check that the signing key is what
/// is expected. For example, checking that the signing key is from a
@@ -71,7 +76,8 @@ impl SignedEnvelope {
Ok((&self.payload, &self.key))
}

/// Encode this [`SignedEnvelope`] using the protobuf encoding specified in the RFC.
/// Encode this [`SignedEnvelope`] using the protobuf encoding specified in
/// the RFC.
pub fn into_protobuf_encoding(self) -> Vec<u8> {
use quick_protobuf::MessageWrite;

@@ -92,7 +98,8 @@ impl SignedEnvelope {
buf
}

/// Decode a [`SignedEnvelope`] using the protobuf encoding specified in the RFC.
/// Decode a [`SignedEnvelope`] using the protobuf encoding specified in the
/// RFC.
pub fn from_protobuf_encoding(bytes: &[u8]) -> Result<Self, DecodingError> {
use quick_protobuf::MessageRead;

@@ -139,24 +146,28 @@ fn signature_payload(domain_separation: String, payload_type: &[u8], payload: &[
buffer
}

/// Errors that occur whilst decoding a [`SignedEnvelope`] from its byte representation.
/// Errors that occur whilst decoding a [`SignedEnvelope`] from its byte
/// representation.
#[derive(thiserror::Error, Debug)]
pub enum DecodingError {
/// Decoding the provided bytes as a signed envelope failed.
#[error("Failed to decode envelope")]
InvalidEnvelope(#[from] DecodeError),
/// The public key in the envelope could not be converted to our internal public key type.
/// The public key in the envelope could not be converted to our internal
/// public key type.
#[error("Failed to convert public key")]
InvalidPublicKey(#[from] libp2p_identity::DecodingError),
/// The public key in the envelope could not be converted to our internal public key type.
/// The public key in the envelope could not be converted to our internal
/// public key type.
#[error("Public key is missing from protobuf struct")]
MissingPublicKey,
}

/// Errors that occur whilst extracting the payload of a [`SignedEnvelope`].
#[derive(Debug)]
pub enum ReadPayloadError {
/// The signature on the signed envelope does not verify with the provided domain separation string.
/// The signature on the signed envelope does not verify with the provided
/// domain separation string.
InvalidSignature,
/// The payload contained in the envelope is not of the expected type.
UnexpectedPayloadType { expected: Vec<u8>, got: Vec<u8> },
126 changes: 71 additions & 55 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
@@ -23,10 +23,9 @@
//! The main entity of this module is the [`Transport`] trait, which provides an
//! interface for establishing connections with other nodes, thereby negotiating
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.
//! modifying a transport through composition with other transports or protocol
//! upgrades.
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{
error::Error,
fmt,
@@ -35,6 +34,9 @@ use std::{
task::{Context, Poll},
};

use futures::prelude::*;
use multiaddr::Multiaddr;

pub mod and_then;
pub mod choice;
pub mod dummy;
@@ -48,14 +50,15 @@ pub mod upgrade;
mod boxed;
mod optional;

pub use self::{
boxed::Boxed,
choice::OrTransport,
memory::MemoryTransport,
optional::OptionalTransport,
upgrade::Upgrade,
};
use crate::{ConnectedPoint, Endpoint};

pub use self::boxed::Boxed;
pub use self::choice::OrTransport;
pub use self::memory::MemoryTransport;
pub use self::optional::OptionalTransport;
pub use self::upgrade::Upgrade;

static NEXT_LISTENER_ID: AtomicUsize = AtomicUsize::new(1);

/// The port use policy for a new connection.
@@ -65,7 +68,8 @@ pub enum PortUse {
New,
/// Best effor reusing of an existing port.
///
/// If there is no listener present that can be used to dial, a new port is allocated.
/// If there is no listener present that can be used to dial, a new port is
/// allocated.
#[default]
Reuse,
}
@@ -75,7 +79,8 @@ pub enum PortUse {
pub struct DialOpts {
/// The endpoint establishing a new connection.
///
/// When attempting a hole-punch, both parties simultaneously "dial" each other but one party has to be the "listener" on the final connection.
/// When attempting a hole-punch, both parties simultaneously "dial" each
/// other but one party has to be the "listener" on the final connection.
/// This option specifies the role of this node in the final connection.
pub role: Endpoint,
/// The port use policy for a new connection.
@@ -87,18 +92,18 @@ pub struct DialOpts {
///
/// Connections are established either by [listening](Transport::listen_on)
/// or [dialing](Transport::dial) on a [`Transport`]. A peer that
/// obtains a connection by listening is often referred to as the *listener* and the
/// peer that initiated the connection through dialing as the *dialer*, in
/// obtains a connection by listening is often referred to as the *listener* and
/// the peer that initiated the connection through dialing as the *dialer*, in
/// contrast to the traditional roles of *server* and *client*.
///
/// Most transports also provide a form of reliable delivery on the established
/// connections but the precise semantics of these guarantees depend on the
/// specific transport.
///
/// This trait is implemented for concrete connection-oriented transport protocols
/// like TCP or Unix Domain Sockets, but also on wrappers that add additional
/// functionality to the dialing or listening process (e.g. name resolution via
/// the DNS).
/// This trait is implemented for concrete connection-oriented transport
/// protocols like TCP or Unix Domain Sockets, but also on wrappers that add
/// additional functionality to the dialing or listening process (e.g. name
/// resolution via the DNS).
///
/// Additional protocols can be layered on top of the connections established
/// by a [`Transport`] through an upgrade mechanism that is initiated via
@@ -124,19 +129,21 @@ pub trait Transport {
/// A pending [`Output`](Transport::Output) for an inbound connection,
/// obtained from the [`Transport`] stream.
///
/// After a connection has been accepted by the transport, it may need to go through
/// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
/// post-processing should not block the `Listener` from producing the next
/// connection, hence further connection setup proceeds asynchronously.
/// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
/// of the connection setup process.
/// After a connection has been accepted by the transport, it may need to go
/// through asynchronous post-processing (i.e. protocol upgrade
/// negotiations). Such post-processing should not block the `Listener`
/// from producing the next connection, hence further connection setup
/// proceeds asynchronously. Once a `ListenerUpgrade` future resolves it
/// yields the [`Output`](Transport::Output) of the connection setup
/// process.
type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;

/// A pending [`Output`](Transport::Output) for an outbound connection,
/// obtained from [dialing](Transport::dial).
type Dial: Future<Output = Result<Self::Output, Self::Error>>;

/// Listens on the given [`Multiaddr`] for inbound connections with a provided [`ListenerId`].
/// Listens on the given [`Multiaddr`] for inbound connections with a
/// provided [`ListenerId`].
fn listen_on(
&mut self,
id: ListenerId,
@@ -149,10 +156,11 @@ pub trait Transport {
/// otherwise.
fn remove_listener(&mut self, id: ListenerId) -> bool;

/// Dials the given [`Multiaddr`], returning a future for a pending outbound connection.
/// Dials the given [`Multiaddr`], returning a future for a pending outbound
/// connection.
///
/// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
/// try an alternative [`Transport`], if available.
/// If [`TransportError::MultiaddrNotSupported`] is returned, it may be
/// desirable to try an alternative [`Transport`], if available.
fn dial(
&mut self,
addr: Multiaddr,
@@ -161,15 +169,16 @@ pub trait Transport {

/// Poll for [`TransportEvent`]s.
///
/// A [`TransportEvent::Incoming`] should be produced whenever a connection is received at the lowest
/// level of the transport stack. The item must be a [`ListenerUpgrade`](Transport::ListenerUpgrade)
/// future that resolves to an [`Output`](Transport::Output) value once all protocol upgrades have
/// been applied.
/// A [`TransportEvent::Incoming`] should be produced whenever a connection
/// is received at the lowest level of the transport stack. The item
/// must be a [`ListenerUpgrade`](Transport::ListenerUpgrade)
/// future that resolves to an [`Output`](Transport::Output) value once all
/// protocol upgrades have been applied.
///
/// Transports are expected to produce [`TransportEvent::Incoming`] events only for
/// listen addresses which have previously been announced via
/// a [`TransportEvent::NewAddress`] event and which have not been invalidated by
/// an [`TransportEvent::AddressExpired`] event yet.
/// Transports are expected to produce [`TransportEvent::Incoming`] events
/// only for listen addresses which have previously been announced via
/// a [`TransportEvent::NewAddress`] event and which have not been
/// invalidated by an [`TransportEvent::AddressExpired`] event yet.
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@@ -195,7 +204,8 @@ pub trait Transport {
map::Map::new(self, f)
}

/// Applies a function on the errors generated by the futures of the transport.
/// Applies a function on the errors generated by the futures of the
/// transport.
fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
@@ -207,8 +217,8 @@ pub trait Transport {
/// Adds a fallback transport that is used when encountering errors
/// while establishing inbound or outbound connections.
///
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
/// The returned transport will act like `self`, except that if `listen_on`
/// or `dial` return an error then `other` will be tried.
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
where
Self: Sized,
@@ -224,7 +234,8 @@ pub trait Transport {
/// This function can be used for ad-hoc protocol upgrades or
/// for processing or adapting the output for following configurations.
///
/// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
/// For the high-level transport upgrade procedure, see
/// [`Transport::upgrade`].
fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
@@ -293,8 +304,8 @@ pub enum TransportEvent<TUpgr, TErr> {
ListenerClosed {
/// The ID of the listener that closed.
listener_id: ListenerId,
/// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
/// if the stream produced an error.
/// Reason for the closure. Contains `Ok(())` if the stream produced
/// `None`, or `Err` if the stream produced an error.
reason: Result<(), TErr>,
},
/// A listener errored.
@@ -311,7 +322,8 @@ pub enum TransportEvent<TUpgr, TErr> {

impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
/// In case this [`TransportEvent`] is an upgrade, apply the given function
/// to the upgrade and produce another transport event based the function's result.
/// to the upgrade and produce another transport event based the function's
/// result.
pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
match self {
TransportEvent::Incoming {
@@ -352,9 +364,11 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
}
}

/// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError),
/// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the
/// error and produce another transport event based on the function's result.
/// In case this [`TransportEvent`] is an
/// [`ListenerError`](TransportEvent::ListenerError),
/// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given
/// function to the error and produce another transport event based on
/// the function's result.
pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
match self {
TransportEvent::Incoming {
@@ -396,7 +410,8 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
}
}

/// Returns `true` if this is an [`Incoming`](TransportEvent::Incoming) transport event.
/// Returns `true` if this is an [`Incoming`](TransportEvent::Incoming)
/// transport event.
pub fn is_upgrade(&self) -> bool {
matches!(self, TransportEvent::Incoming { .. })
}
@@ -426,8 +441,8 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {

/// Try to turn this transport event into the new `Multiaddr`.
///
/// Returns `None` if the event is not actually a [`TransportEvent::NewAddress`],
/// otherwise the address.
/// Returns `None` if the event is not actually a
/// [`TransportEvent::NewAddress`], otherwise the address.
pub fn into_new_address(self) -> Option<Multiaddr> {
if let TransportEvent::NewAddress { listen_addr, .. } = self {
Some(listen_addr)
@@ -443,8 +458,8 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {

/// Try to turn this transport event into the expire `Multiaddr`.
///
/// Returns `None` if the event is not actually a [`TransportEvent::AddressExpired`],
/// otherwise the address.
/// Returns `None` if the event is not actually a
/// [`TransportEvent::AddressExpired`], otherwise the address.
pub fn into_address_expired(self) -> Option<Multiaddr> {
if let TransportEvent::AddressExpired { listen_addr, .. } = self {
Some(listen_addr)
@@ -453,15 +468,16 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
}
}

/// Returns `true` if this is an [`TransportEvent::ListenerError`] transport event.
/// Returns `true` if this is an [`TransportEvent::ListenerError`] transport
/// event.
pub fn is_listener_error(&self) -> bool {
matches!(self, TransportEvent::ListenerError { .. })
}

/// Try to turn this transport event into the listener error.
///
/// Returns `None` if the event is not actually a [`TransportEvent::ListenerError`]`,
/// otherwise the error.
/// Returns `None` if the event is not actually a
/// [`TransportEvent::ListenerError`]`, otherwise the error.
pub fn into_listener_error(self) -> Option<TErr> {
if let TransportEvent::ListenerError { error, .. } = self {
Some(error)
@@ -516,8 +532,8 @@ impl<TUpgr, TErr: fmt::Debug> fmt::Debug for TransportEvent<TUpgr, TErr> {
}
}

/// An error during [dialing][Transport::dial] or [listening][Transport::listen_on]
/// on a [`Transport`].
/// An error during [dialing][Transport::dial] or
/// [listening][Transport::listen_on] on a [`Transport`].
#[derive(Debug, Clone)]
pub enum TransportError<TErr> {
/// The [`Multiaddr`] passed as parameter is not supported.
15 changes: 11 additions & 4 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
@@ -18,14 +18,21 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
connection::ConnectedPoint,
transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
use std::{
error,
marker::PhantomPinned,
pin::Pin,
task::{Context, Poll},
};

use either::Either;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};

use crate::{
connection::ConnectedPoint,
transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
};

/// See the [`Transport::and_then`] method.
#[pin_project::pin_project]
11 changes: 7 additions & 4 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
@@ -18,16 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
use futures::{prelude::*, stream::FusedStream};
use multiaddr::Multiaddr;
use std::{
error::Error,
fmt, io,
fmt,
io,
pin::Pin,
task::{Context, Poll},
};

use futures::{prelude::*, stream::FusedStream};
use multiaddr::Multiaddr;

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};

/// Creates a new [`Boxed`] transport from the given transport.
pub(crate) fn boxed<T>(transport: T) -> Boxed<T::Output>
where
13 changes: 10 additions & 3 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
@@ -18,12 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::either::EitherFuture;
use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
use std::{
pin::Pin,
task::{Context, Poll},
};

use either::Either;
use futures::future;
use multiaddr::Multiaddr;
use std::{pin::Pin, task::Context, task::Poll};

use crate::{
either::EitherFuture,
transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
};

/// Struct returned by `or_transport()`.
#[derive(Debug, Copy, Clone)]
19 changes: 14 additions & 5 deletions core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
@@ -18,14 +18,22 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
use crate::Multiaddr;
use futures::{prelude::*, task::Context, task::Poll};
use std::{fmt, io, marker::PhantomData, pin::Pin};

use futures::{
prelude::*,
task::{Context, Poll},
};

use crate::{
transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent},
Multiaddr,
};

/// Implementation of `Transport` that doesn't support any multiaddr.
///
/// Useful for testing purposes, or as a fallback implementation when no protocol is available.
/// Useful for testing purposes, or as a fallback implementation when no
/// protocol is available.
pub struct DummyTransport<TOut = DummyStream>(PhantomData<TOut>);

impl<TOut> DummyTransport<TOut> {
@@ -87,7 +95,8 @@ impl<TOut> Transport for DummyTransport<TOut> {
}
}

/// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be instantiated.
/// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be
/// instantiated.
pub struct DummyStream(());

impl fmt::Debug for DummyStream {
111 changes: 69 additions & 42 deletions core/src/transport/global_only.rs
Original file line number Diff line number Diff line change
@@ -18,32 +18,35 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
multiaddr::{Multiaddr, Protocol},
transport::{DialOpts, ListenerId, TransportError, TransportEvent},
};
use std::{
pin::Pin,
task::{Context, Poll},
};

use crate::{
multiaddr::{Multiaddr, Protocol},
transport::{DialOpts, ListenerId, TransportError, TransportEvent},
};

/// Dropping all dial requests to non-global IP addresses.
#[derive(Debug, Clone, Default)]
pub struct Transport<T> {
inner: T,
}

/// This module contains an implementation of the `is_global` IPv4 address space.
/// This module contains an implementation of the `is_global` IPv4 address
/// space.
///
/// Credit for this implementation goes to the Rust standard library team.
///
/// Unstable tracking issue: [#27709](https://github.com/rust-lang/rust/issues/27709)
mod ipv4_global {
use std::net::Ipv4Addr;

/// Returns [`true`] if this address is reserved by IANA for future use. [IETF RFC 1112]
/// defines the block of reserved addresses as `240.0.0.0/4`. This range normally includes the
/// broadcast address `255.255.255.255`, but this implementation explicitly excludes it, since
/// Returns [`true`] if this address is reserved by IANA for future use.
/// [IETF RFC 1112] defines the block of reserved addresses as
/// `240.0.0.0/4`. This range normally includes the broadcast address
/// `255.255.255.255`, but this implementation explicitly excludes it, since
/// it is obviously not reserved for future use.
///
/// [IETF RFC 1112]: https://tools.ietf.org/html/rfc1112
@@ -60,9 +63,10 @@ mod ipv4_global {
a.octets()[0] & 240 == 240 && !a.is_broadcast()
}

/// Returns [`true`] if this address part of the `198.18.0.0/15` range, which is reserved for
/// network devices benchmarking. This range is defined in [IETF RFC 2544] as `192.18.0.0`
/// through `198.19.255.255` but [errata 423] corrects it to `198.18.0.0/15`.
/// Returns [`true`] if this address part of the `198.18.0.0/15` range,
/// which is reserved for network devices benchmarking. This range is
/// defined in [IETF RFC 2544] as `192.18.0.0` through `198.19.255.255`
/// but [errata 423] corrects it to `198.18.0.0/15`.
///
/// [IETF RFC 2544]: https://tools.ietf.org/html/rfc2544
/// [errata 423]: https://www.rfc-editor.org/errata/eid423
@@ -72,8 +76,8 @@ mod ipv4_global {
a.octets()[0] == 198 && (a.octets()[1] & 0xfe) == 18
}

/// Returns [`true`] if this address is part of the Shared Address Space defined in
/// [IETF RFC 6598] (`100.64.0.0/10`).
/// Returns [`true`] if this address is part of the Shared Address Space
/// defined in [IETF RFC 6598] (`100.64.0.0/10`).
///
/// [IETF RFC 6598]: https://tools.ietf.org/html/rfc6598
#[must_use]
@@ -104,24 +108,32 @@ mod ipv4_global {

/// Returns [`true`] if the address appears to be globally reachable
/// as specified by the [IANA IPv4 Special-Purpose Address Registry].
/// Whether or not an address is practically reachable will depend on your network configuration.
/// Whether or not an address is practically reachable will depend on your
/// network configuration.
///
/// Most IPv4 addresses are globally reachable;
/// unless they are specifically defined as *not* globally reachable.
///
/// Non-exhaustive list of notable addresses that are not globally reachable:
/// Non-exhaustive list of notable addresses that are not globally
/// reachable:
///
/// - The [unspecified address] ([`is_unspecified`](Ipv4Addr::is_unspecified))
/// - Addresses reserved for private use ([`is_private`](Ipv4Addr::is_private))
/// - Addresses in the shared address space ([`is_shared`](Ipv4Addr::is_shared))
/// - The [unspecified address]
/// ([`is_unspecified`](Ipv4Addr::is_unspecified))
/// - Addresses reserved for private use
/// ([`is_private`](Ipv4Addr::is_private))
/// - Addresses in the shared address space
/// ([`is_shared`](Ipv4Addr::is_shared))
/// - Loopback addresses ([`is_loopback`](Ipv4Addr::is_loopback))
/// - Link-local addresses ([`is_link_local`](Ipv4Addr::is_link_local))
/// - Addresses reserved for documentation ([`is_documentation`](Ipv4Addr::is_documentation))
/// - Addresses reserved for benchmarking ([`is_benchmarking`](Ipv4Addr::is_benchmarking))
/// - Addresses reserved for documentation
/// ([`is_documentation`](Ipv4Addr::is_documentation))
/// - Addresses reserved for benchmarking
/// ([`is_benchmarking`](Ipv4Addr::is_benchmarking))
/// - Reserved addresses ([`is_reserved`](Ipv4Addr::is_reserved))
/// - The [broadcast address] ([`is_broadcast`](Ipv4Addr::is_broadcast))
///
/// For the complete overview of which addresses are globally reachable, see the table at the [IANA IPv4 Special-Purpose Address Registry].
/// For the complete overview of which addresses are globally reachable, see
/// the table at the [IANA IPv4 Special-Purpose Address Registry].
///
/// [IANA IPv4 Special-Purpose Address Registry]: https://www.iana.org/assignments/iana-ipv4-special-registry/iana-ipv4-special-registry.xhtml
/// [unspecified address]: Ipv4Addr::UNSPECIFIED
@@ -143,33 +155,40 @@ mod ipv4_global {
}
}

/// This module contains an implementation of the `is_global` IPv6 address space.
/// This module contains an implementation of the `is_global` IPv6 address
/// space.
///
/// Credit for this implementation goes to the Rust standard library team.
///
/// Unstable tracking issue: [#27709](https://github.com/rust-lang/rust/issues/27709)
mod ipv6_global {
use std::net::Ipv6Addr;

/// Returns `true` if the address is a unicast address with link-local scope,
/// as defined in [RFC 4291].
/// Returns `true` if the address is a unicast address with link-local
/// scope, as defined in [RFC 4291].
///
/// A unicast address has link-local scope if it has the prefix `fe80::/10`, as per [RFC 4291 section 2.4].
/// Note that this encompasses more addresses than those defined in [RFC 4291 section 2.5.6],
/// which describes "Link-Local IPv6 Unicast Addresses" as having the following stricter format:
/// A unicast address has link-local scope if it has the prefix `fe80::/10`,
/// as per [RFC 4291 section 2.4]. Note that this encompasses more
/// addresses than those defined in [RFC 4291 section 2.5.6],
/// which describes "Link-Local IPv6 Unicast Addresses" as having the
/// following stricter format:
///
/// ```text
/// | 10 bits | 54 bits | 64 bits |
/// +----------+-------------------------+----------------------------+
/// |1111111010| 0 | interface ID |
/// +----------+-------------------------+----------------------------+
/// ```
/// So while currently the only addresses with link-local scope an application will encounter are all in `fe80::/64`,
/// this might change in the future with the publication of new standards. More addresses in `fe80::/10` could be allocated,
/// and those addresses will have link-local scope.
/// So while currently the only addresses with link-local scope an
/// application will encounter are all in `fe80::/64`, this might change
/// in the future with the publication of new standards. More addresses in
/// `fe80::/10` could be allocated, and those addresses will have
/// link-local scope.
///
/// Also note that while [RFC 4291 section 2.5.3] mentions about the [loopback address] (`::1`) that "it is treated as having Link-Local scope",
/// this does not mean that the loopback address actually has link-local scope and this method will return `false` on it.
/// Also note that while [RFC 4291 section 2.5.3] mentions about the
/// [loopback address] (`::1`) that "it is treated as having Link-Local
/// scope", this does not mean that the loopback address actually has
/// link-local scope and this method will return `false` on it.
///
/// [RFC 4291]: https://tools.ietf.org/html/rfc4291
/// [RFC 4291 section 2.4]: https://tools.ietf.org/html/rfc4291#section-2.4
@@ -207,25 +226,33 @@ mod ipv6_global {

/// Returns [`true`] if the address appears to be globally reachable
/// as specified by the [IANA IPv6 Special-Purpose Address Registry].
/// Whether or not an address is practically reachable will depend on your network configuration.
/// Whether or not an address is practically reachable will depend on your
/// network configuration.
///
/// Most IPv6 addresses are globally reachable;
/// unless they are specifically defined as *not* globally reachable.
///
/// Non-exhaustive list of notable addresses that are not globally reachable:
/// - The [unspecified address] ([`is_unspecified`](Ipv6Addr::is_unspecified))
/// Non-exhaustive list of notable addresses that are not globally
/// reachable:
/// - The [unspecified address]
/// ([`is_unspecified`](Ipv6Addr::is_unspecified))
/// - The [loopback address] ([`is_loopback`](Ipv6Addr::is_loopback))
/// - IPv4-mapped addresses
/// - Addresses reserved for benchmarking
/// - Addresses reserved for documentation ([`is_documentation`](Ipv6Addr::is_documentation))
/// - Unique local addresses ([`is_unique_local`](Ipv6Addr::is_unique_local))
/// - Unicast addresses with link-local scope ([`is_unicast_link_local`](Ipv6Addr::is_unicast_link_local))
/// - Addresses reserved for documentation
/// ([`is_documentation`](Ipv6Addr::is_documentation))
/// - Unique local addresses
/// ([`is_unique_local`](Ipv6Addr::is_unique_local))
/// - Unicast addresses with link-local scope
/// ([`is_unicast_link_local`](Ipv6Addr::is_unicast_link_local))
///
/// For the complete overview of which addresses are globally reachable, see the table at the [IANA IPv6 Special-Purpose Address Registry].
/// For the complete overview of which addresses are globally reachable, see
/// the table at the [IANA IPv6 Special-Purpose Address Registry].
///
/// Note that an address having global scope is not the same as being globally reachable,
/// and there is no direct relation between the two concepts: There exist addresses with global scope
/// that are not globally reachable (for example unique local addresses),
/// Note that an address having global scope is not the same as being
/// globally reachable, and there is no direct relation between the two
/// concepts: There exist addresses with global scope that are not
/// globally reachable (for example unique local addresses),
/// and addresses that are globally reachable without having global scope
/// (multicast addresses with non-global scope).
///
13 changes: 8 additions & 5 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
@@ -18,16 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::DialOpts;
use crate::{
connection::ConnectedPoint,
transport::{Transport, TransportError, TransportEvent},
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{pin::Pin, task::Context, task::Poll};

use super::ListenerId;
use crate::{
connection::ConnectedPoint,
transport::{DialOpts, Transport, TransportError, TransportEvent},
};

/// See `Transport::map`.
#[derive(Debug, Copy, Clone)]
10 changes: 8 additions & 2 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
@@ -18,10 +18,16 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
use std::{
error,
pin::Pin,
task::{Context, Poll},
};

use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, pin::Pin, task::Context, task::Poll};

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};

/// See `Transport::map_err`.
#[derive(Debug, Copy, Clone)]
40 changes: 26 additions & 14 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
@@ -18,20 +18,29 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
use fnv::FnvHashMap;
use futures::{channel::mpsc, future::Ready, prelude::*, task::Context, task::Poll};
use multiaddr::{Multiaddr, Protocol};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use rw_stream_sink::RwStreamSink;
use std::{
collections::{hash_map::Entry, VecDeque},
error, fmt, io,
error,
fmt,
io,
num::NonZeroU64,
pin::Pin,
};

use fnv::FnvHashMap;
use futures::{
channel::mpsc,
future::Ready,
prelude::*,
task::{Context, Poll},
};
use multiaddr::{Multiaddr, Protocol};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use rw_stream_sink::RwStreamSink;

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};

static HUB: Lazy<Hub> = Lazy::new(|| Hub(Mutex::new(FnvHashMap::default())));

struct Hub(Mutex<FnvHashMap<NonZeroU64, ChannelSender>>);
@@ -306,7 +315,8 @@ pub struct Listener {
addr: Multiaddr,
/// Receives incoming connections.
receiver: ChannelReceiver,
/// Generate [`TransportEvent::NewAddress`] to inform about our listen address.
/// Generate [`TransportEvent::NewAddress`] to inform about our listen
/// address.
tell_listen_addr: bool,
}

@@ -322,12 +332,14 @@ fn parse_memory_addr(a: &Multiaddr) -> Result<u64, ()> {
}
}

/// A channel represents an established, in-memory, logical connection between two endpoints.
/// A channel represents an established, in-memory, logical connection between
/// two endpoints.
///
/// Implements `AsyncRead` and `AsyncWrite`.
pub type Channel<T> = RwStreamSink<Chan<T>>;

/// A channel represents an established, in-memory, logical connection between two endpoints.
/// A channel represents an established, in-memory, logical connection between
/// two endpoints.
///
/// Implements `Sink` and `Stream`.
pub struct Chan<T = Vec<u8>> {
@@ -398,9 +410,8 @@ impl<T> Drop for Chan<T> {

#[cfg(test)]
mod tests {
use crate::{transport::PortUse, Endpoint};

use super::*;
use crate::{transport::PortUse, Endpoint};

#[test]
fn parse_memory_addr_works() {
@@ -429,7 +440,8 @@ mod tests {
);
assert_eq!(
parse_memory_addr(
&"/memory/5/p2p/12D3KooWETLZBFBfkzvH3BQEtA1TJZPmjb4a18ss5TpwNU7DHDX6/p2p-circuit/p2p/12D3KooWLiQ7i8sY6LkPvHmEymncicEgzrdpXegbxEr3xgN8oxMU"
&"/memory/5/p2p/12D3KooWETLZBFBfkzvH3BQEtA1TJZPmjb4a18ss5TpwNU7DHDX6/p2p-circuit/\
p2p/12D3KooWLiQ7i8sY6LkPvHmEymncicEgzrdpXegbxEr3xgN8oxMU"
.parse()
.unwrap()
),
17 changes: 11 additions & 6 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
@@ -18,16 +18,21 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};
use std::{
pin::Pin,
task::{Context, Poll},
};

use multiaddr::Multiaddr;
use std::{pin::Pin, task::Context, task::Poll};

use crate::transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent};

/// Transport that is possibly disabled.
///
/// An `OptionalTransport<T>` is a wrapper around an `Option<T>`. If it is disabled (read: contains
/// `None`), then any attempt to dial or listen will return `MultiaddrNotSupported`. If it is
/// enabled (read: contains `Some`), then dialing and listening will be handled by the inner
/// transport.
/// An `OptionalTransport<T>` is a wrapper around an `Option<T>`. If it is
/// disabled (read: contains `None`), then any attempt to dial or listen will
/// return `MultiaddrNotSupported`. If it is enabled (read: contains `Some`),
/// then dialing and listening will be handled by the inner transport.
#[derive(Debug, Copy, Clone)]
#[pin_project::pin_project]
pub struct OptionalTransport<T>(#[pin] Option<T>);
35 changes: 23 additions & 12 deletions core/src/transport/timeout.rs
Original file line number Diff line number Diff line change
@@ -24,17 +24,26 @@
//! underlying `Transport`.
// TODO: add example

use crate::transport::DialOpts;
use crate::{
transport::{ListenerId, TransportError, TransportEvent},
Multiaddr, Transport,
use std::{
error,
fmt,
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::prelude::*;
use futures_timer::Delay;
use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};

/// A `TransportTimeout` is a `Transport` that wraps another `Transport` and adds
/// timeouts to all inbound and outbound connection attempts.
use crate::{
transport::{DialOpts, ListenerId, TransportError, TransportEvent},
Multiaddr,
Transport,
};

/// A `TransportTimeout` is a `Transport` that wraps another `Transport` and
/// adds timeouts to all inbound and outbound connection attempts.
///
/// **Note**: `listen_on` is never subject to a timeout, only the setup of each
/// individual accepted connection.
@@ -48,7 +57,8 @@ pub struct TransportTimeout<InnerTrans> {
}

impl<InnerTrans> TransportTimeout<InnerTrans> {
/// Wraps around a `Transport` to add timeouts to all the sockets created by it.
/// Wraps around a `Transport` to add timeouts to all the sockets created by
/// it.
pub fn new(trans: InnerTrans, timeout: Duration) -> Self {
TransportTimeout {
inner: trans,
@@ -151,10 +161,11 @@ where
type Output = Result<InnerFut::Ok, TransportTimeoutError<InnerFut::Error>>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// It is debatable whether we should poll the inner future first or the timer first.
// For example, if you start dialing with a timeout of 10 seconds, then after 15 seconds
// the dialing succeeds on the wire, then after 20 seconds you poll, then depending on
// which gets polled first, the outcome will be success or failure.
// It is debatable whether we should poll the inner future first or the timer
// first. For example, if you start dialing with a timeout of 10
// seconds, then after 15 seconds the dialing succeeds on the wire, then
// after 20 seconds you poll, then depending on which gets polled first,
// the outcome will be success or failure.

let mut this = self.project();

70 changes: 42 additions & 28 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
@@ -20,32 +20,44 @@

//! Configuration of transport protocol upgrades.
pub use crate::upgrade::Version;
use std::{
error::Error,
fmt,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::{prelude::*, ready};
use libp2p_identity::PeerId;
use multiaddr::Multiaddr;

use crate::transport::DialOpts;
pub use crate::upgrade::Version;
use crate::{
connection::ConnectedPoint,
muxing::{StreamMuxer, StreamMuxerBox},
transport::{
and_then::AndThen, boxed::boxed, timeout::TransportTimeout, ListenerId, Transport,
TransportError, TransportEvent,
and_then::AndThen,
boxed::boxed,
timeout::TransportTimeout,
DialOpts,
ListenerId,
Transport,
TransportError,
TransportEvent,
},
upgrade::{
self, apply_inbound, apply_outbound, InboundConnectionUpgrade, InboundUpgradeApply,
OutboundConnectionUpgrade, OutboundUpgradeApply, UpgradeError,
self,
apply_inbound,
apply_outbound,
InboundConnectionUpgrade,
InboundUpgradeApply,
OutboundConnectionUpgrade,
OutboundUpgradeApply,
UpgradeError,
},
Negotiated,
};
use futures::{prelude::*, ready};
use libp2p_identity::PeerId;
use multiaddr::Multiaddr;
use std::{
error::Error,
fmt,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

/// A `Builder` facilitates upgrading of a [`Transport`] for use with
/// a `Swarm`.
@@ -59,8 +71,8 @@ use std::{
/// It thus enforces the following invariants on every transport
/// obtained from [`multiplex`](Authenticated::multiplex):
///
/// 1. The transport must be [authenticated](Builder::authenticate)
/// and [multiplexed](Authenticated::multiplex).
/// 1. The transport must be [authenticated](Builder::authenticate) and
/// [multiplexed](Authenticated::multiplex).
/// 2. Authentication must precede the negotiation of a multiplexer.
/// 3. Applying a multiplexer is the last step in the upgrade process.
/// 4. The [`Transport::Output`] conforms to the requirements of a `Swarm`,
@@ -185,7 +197,8 @@ where
}
}

/// An transport with peer authentication, obtained from [`Builder::authenticate`].
/// An transport with peer authentication, obtained from
/// [`Builder::authenticate`].
#[derive(Clone)]
pub struct Authenticated<T>(Builder<T>);

@@ -222,8 +235,8 @@ where
/// Upgrades the transport with a (sub)stream multiplexer.
///
/// The supplied upgrade receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process.
/// produce a [`StreamMuxer`] `M`. The transport must already be
/// authenticated. This ends the (regular) transport upgrade process.
///
/// ## Transitions
///
@@ -251,12 +264,13 @@ where
}))
}

/// Like [`Authenticated::multiplex`] but accepts a function which returns the upgrade.
/// Like [`Authenticated::multiplex`] but accepts a function which returns
/// the upgrade.
///
/// The supplied function is applied to [`PeerId`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process.
/// produce a [`StreamMuxer`] `M`. The transport must already be
/// authenticated. This ends the (regular) transport upgrade process.
///
/// ## Transitions
///
@@ -499,8 +513,8 @@ where
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// We use a `this` variable because the compiler can't mutably borrow multiple times
// across a `Deref`.
// We use a `this` variable because the compiler can't mutably borrow multiple
// times across a `Deref`.
let this = &mut *self;

loop {
@@ -558,8 +572,8 @@ where
type Output = Result<(PeerId, D), TransportUpgradeError<F::Error, U::Error>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// We use a `this` variable because the compiler can't mutably borrow multiple times
// across a `Deref`.
// We use a `this` variable because the compiler can't mutably borrow multiple
// times across a `Deref`.
let this = &mut *self;

loop {
117 changes: 70 additions & 47 deletions core/src/upgrade.rs
Original file line number Diff line number Diff line change
@@ -18,44 +18,51 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Contains everything related to upgrading a connection or a substream to use a protocol.
//! Contains everything related to upgrading a connection or a substream to use
//! a protocol.
//!
//! After a connection with a remote has been successfully established or a substream successfully
//! opened, the next step is to *upgrade* this connection or substream to use a protocol.
//! After a connection with a remote has been successfully established or a
//! substream successfully opened, the next step is to *upgrade* this connection
//! or substream to use a protocol.
//!
//! This is where the `UpgradeInfo`, `InboundUpgrade` and `OutboundUpgrade` traits come into play.
//! The `InboundUpgrade` and `OutboundUpgrade` traits are implemented on types that represent a
//! collection of one or more possible protocols for respectively an ingoing or outgoing
//! connection or substream.
//! This is where the `UpgradeInfo`, `InboundUpgrade` and `OutboundUpgrade`
//! traits come into play. The `InboundUpgrade` and `OutboundUpgrade` traits are
//! implemented on types that represent a collection of one or more possible
//! protocols for respectively an ingoing or outgoing connection or substream.
//!
//! > **Note**: Multiple versions of the same protocol are treated as different protocols.
//! > For example, `/foo/1.0.0` and `/foo/1.1.0` are totally unrelated as far as
//! > upgrading is concerned.
//! > **Note**: Multiple versions of the same protocol are treated as different
//! > protocols.
//! > For example, `/foo/1.0.0` and `/foo/1.1.0` are totally unrelated as far as
//! > upgrading is concerned.
//!
//! # Upgrade process
//!
//! An upgrade is performed in two steps:
//!
//! - A protocol negotiation step. The `UpgradeInfo::protocol_info` method is called to determine
//! which protocols are supported by the trait implementation. The `multistream-select` protocol
//! is used in order to agree on which protocol to use amongst the ones supported.
//! - A protocol negotiation step. The `UpgradeInfo::protocol_info` method is
//! called to determine which protocols are supported by the trait
//! implementation. The `multistream-select` protocol is used in order to
//! agree on which protocol to use amongst the ones supported.
//!
//! - A handshake. After a successful negotiation, the `InboundUpgrade::upgrade_inbound` or
//! `OutboundUpgrade::upgrade_outbound` method is called. This method will return a `Future` that
//! performs a handshake. This handshake is considered mandatory, however in practice it is
//! possible for the trait implementation to return a dummy `Future` that doesn't perform any
//! action and immediately succeeds.
//! - A handshake. After a successful negotiation, the
//! `InboundUpgrade::upgrade_inbound` or `OutboundUpgrade::upgrade_outbound`
//! method is called. This method will return a `Future` that performs a
//! handshake. This handshake is considered mandatory, however in practice it
//! is possible for the trait implementation to return a dummy `Future` that
//! doesn't perform any action and immediately succeeds.
//!
//! After an upgrade is successful, an object of type `InboundUpgrade::Output` or
//! `OutboundUpgrade::Output` is returned. The actual object depends on the implementation and
//! there is no constraint on the traits that it should implement, however it is expected that it
//! can be used by the user to control the behaviour of the protocol.
//! After an upgrade is successful, an object of type `InboundUpgrade::Output`
//! or `OutboundUpgrade::Output` is returned. The actual object depends on the
//! implementation and there is no constraint on the traits that it should
//! implement, however it is expected that it can be used by the user to control
//! the behaviour of the protocol.
//!
//! > **Note**: You can use the `apply_inbound` or `apply_outbound` methods to try upgrade a
//! > **Note**: You can use the `apply_inbound` or `apply_outbound` methods to
//! > try upgrade a
//! > connection or substream. However if you use the recommended `Swarm` or
//! > `ConnectionHandler` APIs, the upgrade is automatically handled for you and you don't
//! > `ConnectionHandler` APIs, the upgrade is automatically handled for you and
//! > you don't
//! > need to use these methods.
//!
mod apply;
mod denied;
@@ -66,89 +73,105 @@ mod ready;
mod select;

pub(crate) use apply::{
apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply,
apply,
apply_inbound,
apply_outbound,
InboundUpgradeApply,
OutboundUpgradeApply,
};
pub(crate) use error::UpgradeError;
use futures::future::Future;
pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version};

pub use self::{
denied::DeniedUpgrade, pending::PendingUpgrade, ready::ReadyUpgrade, select::SelectUpgrade,
denied::DeniedUpgrade,
pending::PendingUpgrade,
ready::ReadyUpgrade,
select::SelectUpgrade,
};
pub use crate::Negotiated;
pub use multistream_select::{NegotiatedComplete, NegotiationError, ProtocolError, Version};

/// Common trait for upgrades that can be applied on inbound substreams, outbound substreams,
/// or both.
/// Common trait for upgrades that can be applied on inbound substreams,
/// outbound substreams, or both.
pub trait UpgradeInfo {
/// Opaque type representing a negotiable protocol.
type Info: AsRef<str> + Clone;
/// Iterator returned by `protocol_info`.
type InfoIter: IntoIterator<Item = Self::Info>;

/// Returns the list of protocols that are supported. Used during the negotiation process.
/// Returns the list of protocols that are supported. Used during the
/// negotiation process.
fn protocol_info(&self) -> Self::InfoIter;
}

/// Possible upgrade on an inbound connection or substream.
pub trait InboundUpgrade<C>: UpgradeInfo {
/// Output after the upgrade has been successfully negotiated and the handshake performed.
/// Output after the upgrade has been successfully negotiated and the
/// handshake performed.
type Output;
/// Possible error during the handshake.
type Error;
/// Future that performs the handshake with the remote.
type Future: Future<Output = Result<Self::Output, Self::Error>>;

/// After we have determined that the remote supports one of the protocols we support, this
/// method is called to start the handshake.
/// After we have determined that the remote supports one of the protocols
/// we support, this method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
/// The `info` is the identifier of the protocol, as produced by
/// `protocol_info`.
fn upgrade_inbound(self, socket: C, info: Self::Info) -> Self::Future;
}

/// Possible upgrade on an outbound connection or substream.
pub trait OutboundUpgrade<C>: UpgradeInfo {
/// Output after the upgrade has been successfully negotiated and the handshake performed.
/// Output after the upgrade has been successfully negotiated and the
/// handshake performed.
type Output;
/// Possible error during the handshake.
type Error;
/// Future that performs the handshake with the remote.
type Future: Future<Output = Result<Self::Output, Self::Error>>;

/// After we have determined that the remote supports one of the protocols we support, this
/// method is called to start the handshake.
/// After we have determined that the remote supports one of the protocols
/// we support, this method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
/// The `info` is the identifier of the protocol, as produced by
/// `protocol_info`.
fn upgrade_outbound(self, socket: C, info: Self::Info) -> Self::Future;
}

/// Possible upgrade on an inbound connection
pub trait InboundConnectionUpgrade<T>: UpgradeInfo {
/// Output after the upgrade has been successfully negotiated and the handshake performed.
/// Output after the upgrade has been successfully negotiated and the
/// handshake performed.
type Output;
/// Possible error during the handshake.
type Error;
/// Future that performs the handshake with the remote.
type Future: Future<Output = Result<Self::Output, Self::Error>>;

/// After we have determined that the remote supports one of the protocols we support, this
/// method is called to start the handshake.
/// After we have determined that the remote supports one of the protocols
/// we support, this method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
/// The `info` is the identifier of the protocol, as produced by
/// `protocol_info`.
fn upgrade_inbound(self, socket: T, info: Self::Info) -> Self::Future;
}

/// Possible upgrade on an outbound connection
pub trait OutboundConnectionUpgrade<T>: UpgradeInfo {
/// Output after the upgrade has been successfully negotiated and the handshake performed.
/// Output after the upgrade has been successfully negotiated and the
/// handshake performed.
type Output;
/// Possible error during the handshake.
type Error;
/// Future that performs the handshake with the remote.
type Future: Future<Output = Result<Self::Output, Self::Error>>;

/// After we have determined that the remote supports one of the protocols we support, this
/// method is called to start the handshake.
/// After we have determined that the remote supports one of the protocols
/// we support, this method is called to start the handshake.
///
/// The `info` is the identifier of the protocol, as produced by `protocol_info`.
/// The `info` is the identifier of the protocol, as produced by
/// `protocol_info`.
fn upgrade_outbound(self, socket: T, info: Self::Info) -> Self::Future;
}
19 changes: 14 additions & 5 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
@@ -18,16 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeError};
use crate::{connection::ConnectedPoint, Negotiated};
use std::{
mem,
pin::Pin,
task::{Context, Poll},
};

use futures::{future::Either, prelude::*};
pub(crate) use multistream_select::Version;
use multistream_select::{DialerSelectFuture, ListenerSelectFuture};
use std::{mem, pin::Pin, task::Context, task::Poll};

pub(crate) use multistream_select::Version;
use crate::{
connection::ConnectedPoint,
upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeError},
Negotiated,
};

// TODO: Still needed?
/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
/// Applies an upgrade to the inbound and outbound direction of a connection or
/// substream.
pub(crate) fn apply<C, U>(
conn: C,
up: U,
11 changes: 6 additions & 5 deletions core/src/upgrade/denied.rs
Original file line number Diff line number Diff line change
@@ -18,13 +18,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::{convert::Infallible, iter};

use futures::future;
use std::convert::Infallible;
use std::iter;

/// Dummy implementation of `UpgradeInfo`/`InboundUpgrade`/`OutboundUpgrade` that doesn't support
/// any protocol.
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};

/// Dummy implementation of `UpgradeInfo`/`InboundUpgrade`/`OutboundUpgrade`
/// that doesn't support any protocol.
#[derive(Debug, Copy, Clone)]
pub struct DeniedUpgrade;

8 changes: 5 additions & 3 deletions core/src/upgrade/either.rs
Original file line number Diff line number Diff line change
@@ -18,13 +18,15 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::iter::Map;

use either::Either;
use futures::future;

use crate::{
either::EitherFuture,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
};
use either::Either;
use futures::future;
use std::iter::Map;

impl<A, B> UpgradeInfo for Either<A, B>
where
6 changes: 4 additions & 2 deletions core/src/upgrade/error.rs
Original file line number Diff line number Diff line change
@@ -18,10 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use multistream_select::NegotiationError;
use std::fmt;

/// Error that can happen when upgrading a connection or substream to use a protocol.
use multistream_select::NegotiationError;

/// Error that can happen when upgrading a connection or substream to use a
/// protocol.
#[derive(Debug)]
pub enum UpgradeError<E> {
/// Error during the negotiation process.
11 changes: 6 additions & 5 deletions core/src/upgrade/pending.rs
Original file line number Diff line number Diff line change
@@ -19,13 +19,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::{convert::Infallible, iter};

use futures::future;
use std::convert::Infallible;
use std::iter;

/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that always
/// returns a pending upgrade.
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};

/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and
/// [`OutboundUpgrade`] that always returns a pending upgrade.
#[derive(Debug, Copy, Clone)]
pub struct PendingUpgrade<P> {
protocol_name: P,
10 changes: 6 additions & 4 deletions core/src/upgrade/ready.rs
Original file line number Diff line number Diff line change
@@ -19,12 +19,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use std::{convert::Infallible, iter};

use futures::future;
use std::convert::Infallible;
use std::iter;

/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and [`OutboundUpgrade`] that directly yields the substream.
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};

/// Implementation of [`UpgradeInfo`], [`InboundUpgrade`] and
/// [`OutboundUpgrade`] that directly yields the substream.
#[derive(Debug, Copy, Clone)]
pub struct ReadyUpgrade<P> {
protocol_name: P,
23 changes: 15 additions & 8 deletions core/src/upgrade/select.rs
Original file line number Diff line number Diff line change
@@ -18,17 +18,24 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::either::EitherFuture;
use crate::upgrade::{
InboundConnectionUpgrade, InboundUpgrade, OutboundConnectionUpgrade, OutboundUpgrade,
UpgradeInfo,
};
use std::iter::{Chain, Map};

use either::Either;
use futures::future;
use std::iter::{Chain, Map};

/// Upgrade that combines two upgrades into one. Supports all the protocols supported by either
/// sub-upgrade.
use crate::{
either::EitherFuture,
upgrade::{
InboundConnectionUpgrade,
InboundUpgrade,
OutboundConnectionUpgrade,
OutboundUpgrade,
UpgradeInfo,
},
};

/// Upgrade that combines two upgrades into one. Supports all the protocols
/// supported by either sub-upgrade.
///
/// The protocols supported by the first element have a higher priority.
#[derive(Debug, Clone)]
11 changes: 6 additions & 5 deletions core/tests/transport_upgrade.rs
Original file line number Diff line number Diff line change
@@ -18,18 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::{io, pin::Pin};

use futures::prelude::*;
use libp2p_core::transport::{DialOpts, ListenerId, MemoryTransport, PortUse, Transport};
use libp2p_core::upgrade::{
self, InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo,
use libp2p_core::{
transport::{DialOpts, ListenerId, MemoryTransport, PortUse, Transport},
upgrade::{self, InboundConnectionUpgrade, OutboundConnectionUpgrade, UpgradeInfo},
Endpoint,
};
use libp2p_core::Endpoint;
use libp2p_identity as identity;
use libp2p_mplex::MplexConfig;
use libp2p_noise as noise;
use multiaddr::{Multiaddr, Protocol};
use rand::random;
use std::{io, pin::Pin};

#[derive(Clone)]
struct HelloUpgrade {}
20 changes: 13 additions & 7 deletions examples/autonat/src/bin/autonat_client.rs
Original file line number Diff line number Diff line change
@@ -20,15 +20,21 @@

#![doc = include_str!("../../README.md")]

use std::{error::Error, net::Ipv4Addr, time::Duration};

use clap::Parser;
use futures::StreamExt;
use libp2p::core::multiaddr::Protocol;
use libp2p::core::Multiaddr;
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{autonat, identify, identity, noise, tcp, yamux, PeerId};
use std::error::Error;
use std::net::Ipv4Addr;
use std::time::Duration;
use libp2p::{
autonat,
core::{multiaddr::Protocol, Multiaddr},
identify,
identity,
noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp,
yamux,
PeerId,
};
use tracing_subscriber::EnvFilter;

#[derive(Debug, Parser)]
18 changes: 12 additions & 6 deletions examples/autonat/src/bin/autonat_server.rs
Original file line number Diff line number Diff line change
@@ -20,14 +20,20 @@

#![doc = include_str!("../../README.md")]

use std::{error::Error, net::Ipv4Addr, time::Duration};

use clap::Parser;
use futures::StreamExt;
use libp2p::core::{multiaddr::Protocol, Multiaddr};
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{autonat, identify, identity, noise, tcp, yamux};
use std::error::Error;
use std::net::Ipv4Addr;
use std::time::Duration;
use libp2p::{
autonat,
core::{multiaddr::Protocol, Multiaddr},
identify,
identity,
noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp,
yamux,
};
use tracing_subscriber::EnvFilter;

#[derive(Debug, Parser)]
18 changes: 14 additions & 4 deletions examples/autonatv2/src/bin/autonatv2_client.rs
Original file line number Diff line number Diff line change
@@ -4,11 +4,15 @@ use clap::Parser;
use libp2p::{
autonat,
futures::StreamExt,
identify, identity,
identify,
identity,
multiaddr::Protocol,
noise,
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr, SwarmBuilder,
tcp,
yamux,
Multiaddr,
SwarmBuilder,
};
use rand::rngs::OsRng;
use tracing_subscriber::EnvFilter;
@@ -73,15 +77,21 @@ async fn main() -> Result<(), Box<dyn Error>> {
bytes_sent,
result: Ok(()),
})) => {
println!("Tested {tested_addr} with {server}. Sent {bytes_sent} bytes for verification. Everything Ok and verified.");
println!(
"Tested {tested_addr} with {server}. Sent {bytes_sent} bytes for \
verification. Everything Ok and verified."
);
}
SwarmEvent::Behaviour(BehaviourEvent::Autonat(autonat::v2::client::Event {
server,
tested_addr,
bytes_sent,
result: Err(e),
})) => {
println!("Tested {tested_addr} with {server}. Sent {bytes_sent} bytes for verification. Failed with {e:?}.");
println!(
"Tested {tested_addr} with {server}. Sent {bytes_sent} bytes for \
verification. Failed with {e:?}."
);
}
SwarmEvent::ExternalAddrConfirmed { address } => {
println!("External address confirmed: {address}");
8 changes: 6 additions & 2 deletions examples/autonatv2/src/bin/autonatv2_server.rs
Original file line number Diff line number Diff line change
@@ -5,11 +5,15 @@ use clap::Parser;
use libp2p::{
autonat,
futures::StreamExt,
identify, identity,
identify,
identity,
multiaddr::Protocol,
noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr, SwarmBuilder,
tcp,
yamux,
Multiaddr,
SwarmBuilder,
};
use rand::rngs::OsRng;

8 changes: 3 additions & 5 deletions examples/browser-webrtc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
#![cfg(target_arch = "wasm32")]

use std::{io, time::Duration};

use futures::StreamExt;
use js_sys::Date;
use libp2p::core::Multiaddr;
use libp2p::ping;
use libp2p::swarm::SwarmEvent;
use libp2p::{core::Multiaddr, ping, swarm::SwarmEvent};
use libp2p_webrtc_websys as webrtc_websys;
use std::io;
use std::time::Duration;
use wasm_bindgen::prelude::*;
use web_sys::{Document, HtmlElement};

25 changes: 15 additions & 10 deletions examples/browser-webrtc/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
#![allow(non_upper_case_globals)]

use std::{
net::{Ipv4Addr, SocketAddr},
time::Duration,
};

use anyhow::Result;
use axum::extract::{Path, State};
use axum::http::header::CONTENT_TYPE;
use axum::http::StatusCode;
use axum::response::{Html, IntoResponse};
use axum::{http::Method, routing::get, Router};
use axum::{
extract::{Path, State},
http::{header::CONTENT_TYPE, Method, StatusCode},
response::{Html, IntoResponse},
routing::get,
Router,
};
use futures::StreamExt;
use libp2p::{
core::muxing::StreamMuxerBox,
core::Transport,
core::{muxing::StreamMuxerBox, Transport},
multiaddr::{Multiaddr, Protocol},
ping,
swarm::SwarmEvent,
};
use libp2p_webrtc as webrtc;
use rand::thread_rng;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::Duration;
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};

@@ -127,7 +131,8 @@ struct Libp2pEndpoint(Multiaddr);
/// Serves the index.html file for our client.
///
/// Our server listens on a random UDP port for the WebRTC transport.
/// To allow the client to connect, we replace the `__LIBP2P_ENDPOINT__` placeholder with the actual address.
/// To allow the client to connect, we replace the `__LIBP2P_ENDPOINT__`
/// placeholder with the actual address.
async fn get_index(
State(Libp2pEndpoint(libp2p_endpoint)): State<Libp2pEndpoint>,
) -> Result<Html<String>, StatusCode> {
26 changes: 19 additions & 7 deletions examples/chat/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,12 +20,22 @@

#![doc = include_str!("../README.md")]

use std::{
collections::hash_map::DefaultHasher,
error::Error,
hash::{Hash, Hasher},
time::Duration,
};

use futures::stream::StreamExt;
use libp2p::{gossipsub, mdns, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::Duration;
use libp2p::{
gossipsub,
mdns,
noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp,
yamux,
};
use tokio::{io, io::AsyncBufReadExt, select};
use tracing_subscriber::EnvFilter;

@@ -51,7 +61,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
)?
.with_quic()
.with_behaviour(|key| {
// To content-address message, we can take the hash of message and use it as an ID.
// To content-address message, we can take the hash of message and use it as an
// ID.
let message_id_fn = |message: &gossipsub::Message| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
@@ -61,7 +72,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Set a custom gossipsub configuration
let gossipsub_config = gossipsub::ConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message
// signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`.
20 changes: 14 additions & 6 deletions examples/dcutr/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,16 +20,23 @@

#![doc = include_str!("../README.md")]

use std::{error::Error, str::FromStr, time::Duration};

use clap::Parser;
use futures::{executor::block_on, future::FutureExt, stream::StreamExt};
use libp2p::{
core::multiaddr::{Multiaddr, Protocol},
dcutr, identify, identity, noise, ping, relay,
dcutr,
identify,
identity,
noise,
ping,
relay,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, PeerId,
tcp,
yamux,
PeerId,
};
use std::str::FromStr;
use std::{error::Error, time::Duration};
use tracing_subscriber::EnvFilter;

#[derive(Debug, Parser)]
@@ -136,8 +143,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
});

// Connect to the relay server. Not for the reservation or relayed connection, but to (a) learn
// our local public address and (b) enable a freshly started relay to learn its public address.
// Connect to the relay server. Not for the reservation or relayed connection,
// but to (a) learn our local public address and (b) enable a freshly
// started relay to learn its public address.
swarm.dial(opts.relay_address.clone()).unwrap();
block_on(async {
let mut learned_observed_addr = false;
15 changes: 8 additions & 7 deletions examples/distributed-key-value-store/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,17 +20,18 @@

#![doc = include_str!("../README.md")]

use std::{error::Error, time::Duration};

use futures::stream::StreamExt;
use libp2p::kad;
use libp2p::kad::store::MemoryStore;
use libp2p::kad::Mode;
use libp2p::{
mdns, noise,
kad,
kad::{store::MemoryStore, Mode},
mdns,
noise,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux,
tcp,
yamux,
};
use std::error::Error;
use std::time::Duration;
use tokio::{
io::{self, AsyncBufReadExt},
select,
11 changes: 4 additions & 7 deletions examples/file-sharing/src/main.rs
Original file line number Diff line number Diff line change
@@ -22,15 +22,12 @@

mod network;

use clap::Parser;
use tokio::task::spawn;
use std::{error::Error, io::Write, path::PathBuf};

use futures::prelude::*;
use futures::StreamExt;
use clap::Parser;
use futures::{prelude::*, StreamExt};
use libp2p::{core::Multiaddr, multiaddr::Protocol};
use std::error::Error;
use std::io::Write;
use std::path::PathBuf;
use tokio::task::spawn;
use tracing_subscriber::EnvFilter;

#[tokio::main]
30 changes: 18 additions & 12 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,33 @@
use futures::channel::{mpsc, oneshot};
use futures::prelude::*;
use futures::StreamExt;
use std::{
collections::{hash_map, HashMap, HashSet},
error::Error,
time::Duration,
};

use futures::{
channel::{mpsc, oneshot},
prelude::*,
StreamExt,
};
use libp2p::{
core::Multiaddr,
identity, kad,
identity,
kad,
multiaddr::Protocol,
noise,
request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
swarm::{NetworkBehaviour, Swarm, SwarmEvent},
tcp, yamux, PeerId,
tcp,
yamux,
PeerId,
StreamProtocol,
};

use libp2p::StreamProtocol;
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::error::Error;
use std::time::Duration;

/// Creates the network components, namely:
///
/// - The network client to interact with the network layer from anywhere
/// within your application.
/// - The network client to interact with the network layer from anywhere within
/// your application.
///
/// - The network event stream, e.g. for incoming requests.
///
3 changes: 2 additions & 1 deletion examples/identify/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,9 +20,10 @@

#![doc = include_str!("../README.md")]

use std::{error::Error, time::Duration};

use futures::StreamExt;
use libp2p::{core::multiaddr::Multiaddr, identify, noise, swarm::SwarmEvent, tcp, yamux};
use std::{error::Error, time::Duration};
use tracing_subscriber::EnvFilter;

#[tokio::main]
20 changes: 15 additions & 5 deletions examples/ipfs-kad/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,15 +20,25 @@

#![doc = include_str!("../README.md")]

use std::num::NonZeroUsize;
use std::ops::Add;
use std::time::{Duration, Instant};
use std::{
num::NonZeroUsize,
ops::Add,
time::{Duration, Instant},
};

use anyhow::{bail, Result};
use clap::Parser;
use futures::StreamExt;
use libp2p::swarm::{StreamProtocol, SwarmEvent};
use libp2p::{bytes::BufMut, identity, kad, noise, tcp, yamux, PeerId};
use libp2p::{
bytes::BufMut,
identity,
kad,
noise,
swarm::{StreamProtocol, SwarmEvent},
tcp,
yamux,
PeerId,
};
use tracing_subscriber::EnvFilter;

const BOOTNODES: [&str; 4] = [
26 changes: 17 additions & 9 deletions examples/ipfs-private/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,23 +20,29 @@

#![doc = include_str!("../README.md")]

use std::{env, error::Error, fs, path::Path, str::FromStr, time::Duration};

use either::Either;
use futures::prelude::*;
use libp2p::{
core::transport::upgrade::Version,
gossipsub, identify,
gossipsub,
identify,
multiaddr::Protocol,
noise, ping,
noise,
ping,
pnet::{PnetConfig, PreSharedKey},
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr, Transport,
tcp,
yamux,
Multiaddr,
Transport,
};
use std::{env, error::Error, fs, path::Path, str::FromStr, time::Duration};
use tokio::{io, io::AsyncBufReadExt, select};
use tracing_subscriber::EnvFilter;

/// Get the current ipfs repo path, either from the IPFS_PATH environment variable or
/// from the default $HOME/.ipfs
/// Get the current ipfs repo path, either from the IPFS_PATH environment
/// variable or from the default $HOME/.ipfs
fn get_ipfs_path() -> Box<Path> {
env::var("IPFS_PATH")
.map(|ipfs_path| Path::new(&ipfs_path).into())
@@ -58,8 +64,9 @@ fn get_psk(path: &Path) -> std::io::Result<Option<String>> {
}
}

/// for a multiaddr that ends with a peer id, this strips this suffix. Rust-libp2p
/// only supports dialing to an address without providing the peer id.
/// for a multiaddr that ends with a peer id, this strips this suffix.
/// Rust-libp2p only supports dialing to an address without providing the peer
/// id.
fn strip_peer_id(addr: &mut Multiaddr) {
let last = addr.pop();
match last {
@@ -105,7 +112,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Gosspipsub topic
let gossipsub_topic = gossipsub::IdentTopic::new("chat");

// We create a custom network behaviour that combines gossipsub, ping and identify.
// We create a custom network behaviour that combines gossipsub, ping and
// identify.
#[derive(NetworkBehaviour)]
struct MyBehaviour {
gossipsub: gossipsub::Behaviour,
16 changes: 7 additions & 9 deletions examples/metrics/src/http_service.rs
Original file line number Diff line number Diff line change
@@ -18,15 +18,13 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};

use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::get, Router};
use prometheus_client::{encoding::text::encode, registry::Registry};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
23 changes: 14 additions & 9 deletions examples/metrics/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,18 +20,23 @@

#![doc = include_str!("../README.md")]

use std::{error::Error, time::Duration};

use futures::StreamExt;
use libp2p::core::Multiaddr;
use libp2p::metrics::{Metrics, Recorder};
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p::{identify, identity, noise, ping, tcp, yamux};
use libp2p::{
core::Multiaddr,
identify,
identity,
metrics::{Metrics, Recorder},
noise,
ping,
swarm::{NetworkBehaviour, SwarmEvent},
tcp,
yamux,
};
use opentelemetry::{trace::TracerProvider, KeyValue};
use prometheus_client::registry::Registry;
use std::error::Error;
use std::time::Duration;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{EnvFilter, Layer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

mod http_service;

3 changes: 2 additions & 1 deletion examples/ping/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,9 +20,10 @@

#![doc = include_str!("../README.md")]

use std::{error::Error, time::Duration};

use futures::prelude::*;
use libp2p::{noise, ping, swarm::SwarmEvent, tcp, yamux, Multiaddr};
use std::{error::Error, time::Duration};
use tracing_subscriber::EnvFilter;

#[tokio::main]
22 changes: 15 additions & 7 deletions examples/relay-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -21,17 +21,24 @@

#![doc = include_str!("../README.md")]

use std::{
error::Error,
net::{Ipv4Addr, Ipv6Addr},
};

use clap::Parser;
use futures::StreamExt;
use libp2p::{
core::multiaddr::Protocol,
core::Multiaddr,
identify, identity, noise, ping, relay,
core::{multiaddr::Protocol, Multiaddr},
identify,
identity,
noise,
ping,
relay,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux,
tcp,
yamux,
};
use std::error::Error;
use std::net::{Ipv4Addr, Ipv6Addr};
use tracing_subscriber::EnvFilter;

#[tokio::main]
@@ -119,7 +126,8 @@ fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair {
#[derive(Debug, Parser)]
#[clap(name = "libp2p relay")]
struct Opt {
/// Determine if the relay listen on ipv6 or ipv4 loopback address. the default is ipv4
/// Determine if the relay listen on ipv6 or ipv4 loopback address. the
/// default is ipv4
#[clap(long)]
use_ipv6: Option<bool>,

12 changes: 8 additions & 4 deletions examples/rendezvous/src/bin/rzv-discover.rs
Original file line number Diff line number Diff line change
@@ -18,15 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::{error::Error, time::Duration};

use futures::StreamExt;
use libp2p::{
multiaddr::Protocol,
noise, ping, rendezvous,
noise,
ping,
rendezvous,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr,
tcp,
yamux,
Multiaddr,
};
use std::error::Error;
use std::time::Duration;
use tracing_subscriber::EnvFilter;

const NAMESPACE: &str = "rendezvous";
12 changes: 9 additions & 3 deletions examples/rendezvous/src/bin/rzv-identify.rs
Original file line number Diff line number Diff line change
@@ -18,13 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::time::Duration;

use futures::StreamExt;
use libp2p::{
identify, noise, ping, rendezvous,
identify,
noise,
ping,
rendezvous,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr,
tcp,
yamux,
Multiaddr,
};
use std::time::Duration;
use tracing_subscriber::EnvFilter;

#[tokio::main]
16 changes: 11 additions & 5 deletions examples/rendezvous/src/bin/rzv-register.rs
Original file line number Diff line number Diff line change
@@ -18,13 +18,18 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::time::Duration;

use futures::StreamExt;
use libp2p::{
noise, ping, rendezvous,
noise,
ping,
rendezvous,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Multiaddr,
tcp,
yamux,
Multiaddr,
};
use std::time::Duration;
use tracing_subscriber::EnvFilter;

#[tokio::main]
@@ -54,8 +59,9 @@ async fn main() {
.with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::from_secs(5)))
.build();

// In production the external address should be the publicly facing IP address of the rendezvous point.
// This address is recorded in the registration entry by the rendezvous point.
// In production the external address should be the publicly facing IP address
// of the rendezvous point. This address is recorded in the registration
// entry by the rendezvous point.
let external_address = "/ip4/127.0.0.1/tcp/0".parse::<Multiaddr>().unwrap();
swarm.add_external_address(external_address);

16 changes: 10 additions & 6 deletions examples/rendezvous/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,14 +20,18 @@

#![doc = include_str!("../README.md")]

use std::{error::Error, time::Duration};

use futures::StreamExt;
use libp2p::{
identify, noise, ping, rendezvous,
identify,
noise,
ping,
rendezvous,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux,
tcp,
yamux,
};
use std::error::Error;
use std::time::Duration;
use tracing_subscriber::EnvFilter;

#[tokio::main]
@@ -36,8 +40,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
.with_env_filter(EnvFilter::from_default_env())
.try_init();

// Results in PeerID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN which is
// used as the rendezvous point by the other peer examples.
// Results in PeerID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN which
// is used as the rendezvous point by the other peer examples.
let keypair = libp2p::identity::Keypair::ed25519_from_bytes([0; 32]).unwrap();

let mut swarm = libp2p::SwarmBuilder::with_existing_identity(keypair)
20 changes: 12 additions & 8 deletions examples/stream/src/main.rs
Original file line number Diff line number Diff line change
@@ -43,13 +43,15 @@ async fn main() -> Result<()> {

// Deal with incoming streams.
// Spawning a dedicated task is just one way of doing this.
// libp2p doesn't care how you handle incoming streams but you _must_ handle them somehow.
// To mitigate DoS attacks, libp2p will internally drop incoming streams if your application cannot keep up processing them.
// libp2p doesn't care how you handle incoming streams but you _must_ handle
// them somehow. To mitigate DoS attacks, libp2p will internally drop
// incoming streams if your application cannot keep up processing them.
tokio::spawn(async move {
// This loop handles incoming streams _sequentially_ but that doesn't have to be the case.
// You can also spawn a dedicated task per stream if you want to.
// Be aware that this breaks backpressure though as spawning new tasks is equivalent to an unbounded buffer.
// Each task needs memory meaning an aggressive remote peer may force you OOM this way.
// This loop handles incoming streams _sequentially_ but that doesn't have to be
// the case. You can also spawn a dedicated task per stream if you want
// to. Be aware that this breaks backpressure though as spawning new
// tasks is equivalent to an unbounded buffer. Each task needs memory
// meaning an aggressive remote peer may force you OOM this way.

while let Some((peer, stream)) = incoming_streams.next().await {
match echo(stream).await {
@@ -89,7 +91,8 @@ async fn main() -> Result<()> {
}
}

/// A very simple, `async fn`-based connection handler for our custom echo protocol.
/// A very simple, `async fn`-based connection handler for our custom echo
/// protocol.
async fn connection_handler(peer: PeerId, mut control: stream::Control) {
loop {
tokio::time::sleep(Duration::from_secs(1)).await; // Wait a second between echos.
@@ -102,7 +105,8 @@ async fn connection_handler(peer: PeerId, mut control: stream::Control) {
}
Err(error) => {
// Other errors may be temporary.
// In production, something like an exponential backoff / circuit-breaker may be more appropriate.
// In production, something like an exponential backoff / circuit-breaker may be
// more appropriate.
tracing::debug!(%peer, %error);
continue;
}
8 changes: 6 additions & 2 deletions examples/upnp/src/main.rs
Original file line number Diff line number Diff line change
@@ -20,9 +20,10 @@

#![doc = include_str!("../README.md")]

use std::error::Error;

use futures::prelude::*;
use libp2p::{noise, swarm::SwarmEvent, upnp, yamux, Multiaddr};
use std::error::Error;
use tracing_subscriber::EnvFilter;

#[tokio::main]
@@ -64,7 +65,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
break;
}
SwarmEvent::Behaviour(upnp::Event::NonRoutableGateway) => {
println!("Gateway is not exposed directly to the public Internet, i.e. it itself has a private IP address.");
println!(
"Gateway is not exposed directly to the public Internet, i.e. it itself has a \
private IP address."
);
break;
}
_ => {}
42 changes: 28 additions & 14 deletions hole-punching-tests/src/main.rs
Original file line number Diff line number Diff line change
@@ -18,24 +18,34 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::{
collections::HashMap,
fmt,
io,
net::{IpAddr, Ipv4Addr},
str::FromStr,
time::Duration,
};

use anyhow::{Context, Result};
use either::Either;
use futures::stream::StreamExt;
use libp2p::core::transport::ListenerId;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::ConnectionId;
use libp2p::{
core::multiaddr::{Multiaddr, Protocol},
dcutr, identify, noise, ping, relay,
swarm::{NetworkBehaviour, SwarmEvent},
tcp, yamux, Swarm,
core::{
multiaddr::{Multiaddr, Protocol},
transport::ListenerId,
},
dcutr,
identify,
noise,
ping,
relay,
swarm::{dial_opts::DialOpts, ConnectionId, NetworkBehaviour, SwarmEvent},
tcp,
yamux,
Swarm,
};
use redis::AsyncCommands;
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::str::FromStr;
use std::time::Duration;
use std::{fmt, io};

/// The redis key we push the relay's TCP listen address to.
const RELAY_TCP_ADDRESS: &str = "RELAY_TCP_ADDRESS";
@@ -47,7 +57,10 @@ const LISTEN_CLIENT_PEER_ID: &str = "LISTEN_CLIENT_PEER_ID";
#[tokio::main]
async fn main() -> Result<()> {
env_logger::builder()
.parse_filters("debug,netlink_proto=warn,rustls=warn,multistream_select=warn,libp2p_core::transport::choice=off,libp2p_swarm::connection=warn,libp2p_quic=trace")
.parse_filters(
"debug,netlink_proto=warn,rustls=warn,multistream_select=warn,\
libp2p_core::transport::choice=off,libp2p_swarm::connection=warn,libp2p_quic=trace",
)
.parse_default_env()
.init();

@@ -214,7 +227,8 @@ async fn client_listen_on_transport(

let mut listen_addresses = 0;

// We should have at least two listen addresses, one for localhost and the actual interface.
// We should have at least two listen addresses, one for localhost and the
// actual interface.
while listen_addresses < 2 {
if let SwarmEvent::NewListenAddr {
listener_id,
38 changes: 24 additions & 14 deletions identity/src/ecdsa.rs
Original file line number Diff line number Diff line change
@@ -20,21 +20,23 @@

//! ECDSA keys with secp256r1 curve support.
use super::error::DecodingError;
use core::cmp;
use core::fmt;
use core::hash;
use core::{cmp, fmt, hash};
use std::convert::Infallible;

use p256::{
ecdsa::{
signature::{Signer, Verifier},
Signature, SigningKey, VerifyingKey,
Signature,
SigningKey,
VerifyingKey,
},
EncodedPoint,
};
use sec1::{DecodeEcPrivateKey, EncodeEcPrivateKey};
use std::convert::Infallible;
use zeroize::Zeroize;

use super::error::DecodingError;

/// An ECDSA keypair generated using `secp256r1` curve.
#[derive(Clone)]
pub struct Keypair {
@@ -99,19 +101,22 @@ impl SecretKey {
SecretKey(SigningKey::random(&mut rand::thread_rng()))
}

/// Sign a message with this secret key, producing a DER-encoded ECDSA signature.
/// Sign a message with this secret key, producing a DER-encoded ECDSA
/// signature.
pub fn sign(&self, msg: &[u8]) -> Vec<u8> {
let signature: p256::ecdsa::DerSignature = self.0.sign(msg);

signature.as_bytes().to_owned()
}

/// Convert a secret key into a byte buffer containing raw scalar of the key.
/// Convert a secret key into a byte buffer containing raw scalar of the
/// key.
pub fn to_bytes(&self) -> Vec<u8> {
self.0.to_bytes().to_vec()
}

/// Try to parse a secret key from a byte buffer containing raw scalar of the key.
/// Try to parse a secret key from a byte buffer containing raw scalar of
/// the key.
pub fn try_from_bytes(buf: impl AsRef<[u8]>) -> Result<SecretKey, DecodingError> {
SigningKey::from_bytes(buf.as_ref().into())
.map_err(|err| DecodingError::failed_to_parse("ecdsa p256 secret key", err))
@@ -127,7 +132,8 @@ impl SecretKey {
.to_vec()
}

/// Try to decode a secret key from a DER-encoded byte buffer, zeroize the buffer on success.
/// Try to decode a secret key from a DER-encoded byte buffer, zeroize the
/// buffer on success.
pub(crate) fn try_decode_der(buf: &mut [u8]) -> Result<Self, DecodingError> {
match SigningKey::from_sec1_der(buf) {
Ok(key) => {
@@ -158,7 +164,8 @@ impl PublicKey {
self.0.verify(msg, &sig).is_ok()
}

/// Try to parse a public key from a byte buffer containing raw components of a key with or without compression.
/// Try to parse a public key from a byte buffer containing raw components
/// of a key with or without compression.
pub fn try_from_bytes(k: &[u8]) -> Result<PublicKey, DecodingError> {
let enc_pt = EncodedPoint::from_bytes(k)
.map_err(|e| DecodingError::failed_to_parse("ecdsa p256 encoded point", e))?;
@@ -168,18 +175,21 @@ impl PublicKey {
.map(PublicKey)
}

/// Convert a public key into a byte buffer containing raw components of the key without compression.
/// Convert a public key into a byte buffer containing raw components of the
/// key without compression.
pub fn to_bytes(&self) -> Vec<u8> {
self.0.to_encoded_point(false).as_bytes().to_owned()
}

/// Encode a public key into a DER encoded byte buffer as defined by SEC1 standard.
/// Encode a public key into a DER encoded byte buffer as defined by SEC1
/// standard.
pub fn encode_der(&self) -> Vec<u8> {
let buf = self.to_bytes();
Self::add_asn1_header(&buf)
}

/// Try to decode a public key from a DER encoded byte buffer as defined by SEC1 standard.
/// Try to decode a public key from a DER encoded byte buffer as defined by
/// SEC1 standard.
pub fn try_decode_der(k: &[u8]) -> Result<PublicKey, DecodingError> {
let buf = Self::del_asn1_header(k).ok_or_else(|| {
DecodingError::failed_to_parse::<Infallible, _>(
17 changes: 10 additions & 7 deletions identity/src/ed25519.rs
Original file line number Diff line number Diff line change
@@ -20,13 +20,13 @@

//! Ed25519 keys.
use super::error::DecodingError;
use core::cmp;
use core::fmt;
use core::hash;
use core::{cmp, fmt, hash};

use ed25519_dalek::{self as ed25519, Signer as _, Verifier as _};
use zeroize::Zeroize;

use super::error::DecodingError;

/// An Ed25519 keypair.
#[derive(Clone)]
pub struct Keypair(ed25519::SigningKey);
@@ -48,7 +48,8 @@ impl Keypair {
/// Try to parse a keypair from the [binary format](https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.5)
/// produced by [`Keypair::to_bytes`], zeroing the input on success.
///
/// Note that this binary format is the same as `ed25519_dalek`'s and `ed25519_zebra`'s.
/// Note that this binary format is the same as `ed25519_dalek`'s and
/// `ed25519_zebra`'s.
pub fn try_from_bytes(kp: &mut [u8]) -> Result<Keypair, DecodingError> {
let bytes = <[u8; 64]>::try_from(&*kp)
.map_err(|e| DecodingError::failed_to_parse("Ed25519 keypair", e))?;
@@ -152,7 +153,8 @@ impl PublicKey {
self.0.to_bytes()
}

/// Try to parse a public key from a byte array containing the actual key as produced by `to_bytes`.
/// Try to parse a public key from a byte array containing the actual key as
/// produced by `to_bytes`.
pub fn try_from_bytes(k: &[u8]) -> Result<PublicKey, DecodingError> {
let k = <[u8; 32]>::try_from(k)
.map_err(|e| DecodingError::failed_to_parse("Ed25519 public key", e))?;
@@ -206,9 +208,10 @@ impl SecretKey {

#[cfg(test)]
mod tests {
use super::*;
use quickcheck::*;

use super::*;

fn eq_keypairs(kp1: &Keypair, kp2: &Keypair) -> bool {
kp1.public() == kp2.public() && kp1.0.to_bytes() == kp2.0.to_bytes()
}
6 changes: 3 additions & 3 deletions identity/src/error.rs
Original file line number Diff line number Diff line change
@@ -20,8 +20,7 @@

//! Errors during identity key operations.
use std::error::Error;
use std::fmt;
use std::{error::Error, fmt};

use crate::KeyType;

@@ -136,7 +135,8 @@ impl Error for SigningError {
}
}

/// Error produced when failing to convert [`Keypair`](crate::Keypair) to a more concrete keypair.
/// Error produced when failing to convert [`Keypair`](crate::Keypair) to a more
/// concrete keypair.
#[derive(Debug)]
pub struct OtherVariantError {
actual: KeyType,
46 changes: 25 additions & 21 deletions identity/src/keypair.rs
Original file line number Diff line number Diff line change
@@ -24,40 +24,40 @@
feature = "ed25519",
feature = "rsa"
))]
#[cfg(feature = "ed25519")]
use crate::ed25519;
use quick_protobuf::{BytesReader, Writer};

#[cfg(feature = "ecdsa")]
use crate::ecdsa;
#[cfg(any(
feature = "ecdsa",
feature = "secp256k1",
feature = "ed25519",
feature = "rsa"
))]
use crate::error::OtherVariantError;
use crate::error::{DecodingError, SigningError};
#[cfg(feature = "ed25519")]
use crate::ed25519;
#[cfg(any(
feature = "ecdsa",
feature = "secp256k1",
feature = "ed25519",
feature = "rsa"
))]
use crate::proto;
use crate::error::OtherVariantError;
#[cfg(any(
feature = "ecdsa",
feature = "secp256k1",
feature = "ed25519",
feature = "rsa"
))]
use quick_protobuf::{BytesReader, Writer};

use crate::proto;
#[cfg(all(feature = "rsa", not(target_arch = "wasm32")))]
use crate::rsa;

#[cfg(feature = "secp256k1")]
use crate::secp256k1;

#[cfg(feature = "ecdsa")]
use crate::ecdsa;
use crate::KeyType;
use crate::{
error::{DecodingError, SigningError},
KeyType,
};

/// Identity keypair of a node.
///
@@ -75,7 +75,6 @@ use crate::KeyType;
/// let mut bytes = std::fs::read("private.pk8").unwrap();
/// let keypair = Keypair::rsa_from_pkcs8(&mut bytes);
/// ```
///
#[derive(Debug, Clone)]
pub struct Keypair {
keypair: KeyPairInner,
@@ -154,8 +153,8 @@ impl Keypair {
})
}

/// Decode a keypair from a DER-encoded Secp256k1 secret key in an ECPrivateKey
/// structure as defined in [RFC5915].
/// Decode a keypair from a DER-encoded Secp256k1 secret key in an
/// ECPrivateKey structure as defined in [RFC5915].
///
/// [RFC5915]: https://tools.ietf.org/html/rfc5915
#[cfg(feature = "secp256k1")]
@@ -258,7 +257,8 @@ impl Keypair {
unreachable!()
}

/// Decode a private key from a protobuf structure and parse it as a [`Keypair`].
/// Decode a private key from a protobuf structure and parse it as a
/// [`Keypair`].
#[allow(unused_variables)]
pub fn from_protobuf_encoding(bytes: &[u8]) -> Result<Keypair, DecodingError> {
#[cfg(any(
@@ -341,7 +341,8 @@ impl Keypair {
}
}

/// Deterministically derive a new secret from this [`Keypair`], taking into account the provided domain.
/// Deterministically derive a new secret from this [`Keypair`], taking into
/// account the provided domain.
///
/// This works for all key types except RSA where it returns `None`.
///
@@ -352,10 +353,11 @@ impl Keypair {
/// # use libp2p_identity as identity;
/// let key = identity::Keypair::generate_ed25519();
///
/// let new_key = key.derive_secret(b"my encryption key").expect("can derive secret for ed25519");
/// let new_key = key
/// .derive_secret(b"my encryption key")
/// .expect("can derive secret for ed25519");
/// # }
/// ```
///
#[cfg(any(
feature = "ecdsa",
feature = "secp256k1",
@@ -904,19 +906,21 @@ mod tests {

#[test]
fn public_key_implements_hash() {
use crate::PublicKey;
use std::hash::Hash;

use crate::PublicKey;

fn assert_implements_hash<T: Hash>() {}

assert_implements_hash::<PublicKey>();
}

#[test]
fn public_key_implements_ord() {
use crate::PublicKey;
use std::cmp::Ord;

use crate::PublicKey;

fn assert_implements_ord<T: Ord>() {}

assert_implements_ord::<PublicKey>();
3 changes: 2 additions & 1 deletion identity/src/lib.rs
Original file line number Diff line number Diff line change
@@ -31,7 +31,8 @@
//! Instead, loading fixed keys must use the standard, thus more portable
//! binary representation of the specific key type
//! (e.g. [ed25519 binary format](https://datatracker.ietf.org/doc/html/rfc8032#section-5.1.5)).
//! All key types have functions to enable conversion to/from their binary representations.
//! All key types have functions to enable conversion to/from their binary
//! representations.

#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![allow(unreachable_pub)]
12 changes: 7 additions & 5 deletions identity/src/peer_id.rs
Original file line number Diff line number Diff line change
@@ -18,17 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::{fmt, str::FromStr};

#[cfg(feature = "rand")]
use rand::Rng;
use sha2::Digest as _;
use std::{fmt, str::FromStr};
use thiserror::Error;

/// Local type-alias for multihash.
///
/// Must be big enough to accommodate for `MAX_INLINE_KEY_LENGTH`.
/// 64 satisfies that and can hold 512 bit hashes which is what the ecosystem typically uses.
/// Given that this appears in our type-signature, using a "common" number here makes us more compatible.
/// 64 satisfies that and can hold 512 bit hashes which is what the ecosystem
/// typically uses. Given that this appears in our type-signature, using a
/// "common" number here makes us more compatible.
type Multihash = multihash::Multihash<64>;

#[cfg(feature = "serde")]
@@ -43,8 +45,8 @@ const MULTIHASH_SHA256_CODE: u64 = 0x12;

/// Identifier of a peer of the network.
///
/// The data is a CIDv0 compatible multihash of the protobuf encoded public key of the peer
/// as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md).
/// The data is a CIDv0 compatible multihash of the protobuf encoded public key
/// of the peer as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md).
#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct PeerId {
multihash: Multihash,
36 changes: 23 additions & 13 deletions identity/src/rsa.rs
Original file line number Diff line number Diff line change
@@ -20,15 +20,24 @@

//! RSA keys.

use super::error::*;
use asn1_der::typed::{DerDecodable, DerEncodable, DerTypeView, Sequence};
use asn1_der::{Asn1DerError, Asn1DerErrorVariant, DerObject, Sink, VecBacking};
use ring::rand::SystemRandom;
use ring::signature::KeyPair;
use ring::signature::{self, RsaKeyPair, RSA_PKCS1_2048_8192_SHA256, RSA_PKCS1_SHA256};
use std::{fmt, sync::Arc};

use asn1_der::{
typed::{DerDecodable, DerEncodable, DerTypeView, Sequence},
Asn1DerError,
Asn1DerErrorVariant,
DerObject,
Sink,
VecBacking,
};
use ring::{
rand::SystemRandom,
signature::{self, KeyPair, RsaKeyPair, RSA_PKCS1_2048_8192_SHA256, RSA_PKCS1_SHA256},
};
use zeroize::Zeroize;

use super::error::*;

/// An RSA keypair.
#[derive(Clone)]
pub struct Keypair(Arc<RsaKeyPair>);
@@ -42,8 +51,8 @@ impl std::fmt::Debug for Keypair {
}

impl Keypair {
/// Decode an RSA keypair from a DER-encoded private key in PKCS#1 RSAPrivateKey
/// format (i.e. unencrypted) as defined in [RFC3447].
/// Decode an RSA keypair from a DER-encoded private key in PKCS#1
/// RSAPrivateKey format (i.e. unencrypted) as defined in [RFC3447].
///
/// [RFC3447]: https://tools.ietf.org/html/rfc3447#appendix-A.1.2
pub fn try_decode_pkcs1(der: &mut [u8]) -> Result<Keypair, DecodingError> {
@@ -53,8 +62,8 @@ impl Keypair {
Ok(Keypair(Arc::new(kp)))
}

/// Decode an RSA keypair from a DER-encoded private key in PKCS#8 PrivateKeyInfo
/// format (i.e. unencrypted) as defined in [RFC5208].
/// Decode an RSA keypair from a DER-encoded private key in PKCS#8
/// PrivateKeyInfo format (i.e. unencrypted) as defined in [RFC5208].
///
/// [RFC5208]: https://tools.ietf.org/html/rfc5208#section-5
pub fn try_decode_pkcs8(der: &mut [u8]) -> Result<Keypair, DecodingError> {
@@ -100,8 +109,8 @@ impl PublicKey {
self.0.clone()
}

/// Encode the RSA public key in DER as a X.509 SubjectPublicKeyInfo structure,
/// as defined in [RFC5280].
/// Encode the RSA public key in DER as a X.509 SubjectPublicKeyInfo
/// structure, as defined in [RFC5280].
///
/// [RFC5280]: https://tools.ietf.org/html/rfc5280#section-4.1
pub fn encode_x509(&self) -> Vec<u8> {
@@ -315,9 +324,10 @@ impl DerDecodable<'_> for Asn1SubjectPublicKeyInfo {

#[cfg(test)]
mod tests {
use super::*;
use quickcheck::*;

use super::*;

const KEY1: &[u8] = include_bytes!("test/rsa-2048.pk8");
const KEY2: &[u8] = include_bytes!("test/rsa-3072.pk8");
const KEY3: &[u8] = include_bytes!("test/rsa-4096.pk8");
Loading