Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport: Poll Transport directly, remove ListenersStream #2652

Merged
merged 56 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
f134c8b
core/transport: remove Transport::Listener
elenaf9 May 15, 2022
aac9e1c
swarm: remove ListenerStream, poll Transport
elenaf9 May 15, 2022
68663fd
transports/tcp: handle transport changes
elenaf9 May 15, 2022
72c76f0
*: adapt majority of other transports
elenaf9 May 16, 2022
945c4a0
transports/tcp: rename *TcpConfig to *TcpTransport
elenaf9 May 21, 2022
d6f0e75
core/transport: adapt remaining transports
elenaf9 May 22, 2022
858590f
core/transports: unify imports, clean code
elenaf9 May 22, 2022
45f9c96
core/transport: split TransportEvent generics
elenaf9 May 22, 2022
bfd5fb0
core/transport: impl Stream for transport::Boxed
elenaf9 May 22, 2022
90721b9
transports/tcp: impl Stream for GenTcpTransport
elenaf9 May 22, 2022
80c3da1
*: format
elenaf9 May 26, 2022
bcb71a1
transports/dns: adapt dns transport
elenaf9 May 26, 2022
38a2b78
transports/dns: adapt websocket transport
elenaf9 May 26, 2022
607412d
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 May 29, 2022
d7f5019
Remove various `Sync` bounds
thomaseizinger May 19, 2022
b4164e8
transports/tcp: revert Stream impl for GenTcpTransport
elenaf9 May 29, 2022
a7766cd
core/transport/memory: fix listener polling
elenaf9 May 29, 2022
9824acd
transports/uds: adapt uds transport
elenaf9 May 29, 2022
a2988b5
transports/tcp: impl Default for GenTcpTransport
elenaf9 May 29, 2022
2edb0cd
core/transport/memory: add MemoryTransport::new
elenaf9 May 29, 2022
907a5ab
protocols/*: adapt network behaviour protocols
elenaf9 May 29, 2022
1924c12
muxers/mplex: adapt tests and benches
elenaf9 May 29, 2022
8ba3c20
transports/*: adapt tests in upgrade transports
elenaf9 May 29, 2022
96ae97b
src/lib: adapt development transports
elenaf9 May 29, 2022
c686b5a
examples: adapt exmaples to tcp transport changes
elenaf9 May 29, 2022
354d2f0
*: add Transport::remove_listener
elenaf9 May 29, 2022
2226092
core/transport: create ListenerId within Transport
elenaf9 May 29, 2022
8f608cd
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 May 29, 2022
5f9ebb7
*: fix CI
elenaf9 May 29, 2022
199ce12
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 11, 2022
27ee9ca
transports/tcp: fix port-reuse tests
elenaf9 Jun 11, 2022
08f4f80
*: fix intra-doc links
elenaf9 Jun 11, 2022
f92c2a4
transports/tcp: rm unneeded trait-bounds in tests
elenaf9 Jun 11, 2022
a357d71
*: use random ListenerIds instead of namespaced
elenaf9 Jun 20, 2022
a4a745e
core/transport: remove (Partial)Ord for ListenerId
elenaf9 Jun 20, 2022
81c945c
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 20, 2022
1c2b9e5
transports/relay: adapt ClientTransport
elenaf9 Jun 26, 2022
b19e11a
*: apply comments from review
elenaf9 Jun 26, 2022
c95c97c
transport/wasm-ext: adapt wasm-ext transport
elenaf9 Jun 26, 2022
eb0397b
transports/wasm-ext: fix clippy
elenaf9 Jun 27, 2022
610f7ae
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 27, 2022
f28cdb1
*: use intra-doc links
elenaf9 Jun 27, 2022
4841d81
core/transport: remove unnecessary trait bounds
elenaf9 Jun 27, 2022
4e74407
*: rename TransportEvent::Error -> ::ListenerError
elenaf9 Jun 27, 2022
b410724
transport/upgrade: remove unecessary Option
elenaf9 Jun 27, 2022
71885ad
*: clean code, fix docs
elenaf9 Jun 27, 2022
2b3402c
transports/wasm-ext: rm leftover Self: Sized bound
elenaf9 Jun 27, 2022
469515e
*: fix missing renames
elenaf9 Jun 27, 2022
83a13dd
transports/tcp: rm unneeded dependencies, fix docs
elenaf9 Jun 28, 2022
3d4e0aa
transports/tcp: remove oudated comment
elenaf9 Jun 28, 2022
8600be7
*: add changelog entries
elenaf9 Jun 28, 2022
1a16312
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jun 28, 2022
d4b8ba5
Merge branch 'master' of github.com:libp2p/rust-libp2p into refactor-…
elenaf9 Jul 1, 2022
03c5170
transports/tcp/CHANGELOG: remove wrong PR ref
elenaf9 Jul 1, 2022
5f5eb08
Merge branch 'master' into refactor-transport-trait
elenaf9 Jul 1, 2022
c2bc9e8
Merge branch 'master' into refactor-transport-trait
mxinden Jul 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ default = [
"websocket",
"yamux",
]

autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand Down
5 changes: 5 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].

- Remove the concept of individual `Transport::Listener` streams from `Transport`.
Instead the `Transport` is polled directly via `Transport::poll`. The
`Transport` is now responsible for driving its listeners. See [PR 2652].

[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710
[PR 2652]: https://github.com/libp2p/rust-libp2p/pull/2652

# 0.33.0

Expand Down
19 changes: 0 additions & 19 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,6 @@ impl std::ops::Add<usize> for ConnectionId {
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
108 changes: 46 additions & 62 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
use futures::{
Expand Down Expand Up @@ -274,48 +274,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> {
First(#[pin] A),
Second(#[pin] B),
}

impl<AStream, BStream, AInner, BInner, AError, BError> Stream
for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
{
type Item = Result<
ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>,
EitherError<AError, BError>,
>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::First)
.map_err(EitherError::A)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::Second)
.map_err(EitherError::B)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
},
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -385,11 +343,12 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
}
}
}

#[derive(Debug, Copy, Clone)]
#[pin_project(project = EitherTransportProj)]
#[derive(Debug)]
#[must_use = "transports do nothing unless polled"]
pub enum EitherTransport<A, B> {
Left(A),
Right(B),
Left(#[pin] A),
Right(#[pin] B),
}

impl<A, B> Transport for EitherTransport<A, B>
Expand All @@ -399,29 +358,54 @@ where
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::First(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match self.project() {
EitherTransportProj::Left(a) => match a.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::First)
.map_err(EitherError::A),
),
},
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::Second(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
EitherTransportProj::Right(b) => match b.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::Second)
.map_err(EitherError::B),
),
},
}
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
match self {
EitherTransport::Left(t) => t.remove_listener(id),
EitherTransport::Right(t) => t.remove_listener(id),
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::A(err)),
}),
EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::B(err)),
}),
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
Expand Down
Loading