Skip to content

Commit

Permalink
identify: simply stream handling logic when sending pushes
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jan 7, 2023
1 parent aa1f324 commit 56078a1
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 57 deletions.
42 changes: 6 additions & 36 deletions p2p/protocol/identify/peer_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (ph *peerHandler) loop(ctx context.Context, onExit func()) {
}

func (ph *peerHandler) sendPush(ctx context.Context) error {
dp, err := ph.openStream(ctx, []string{IDPush})
dp, err := ph.openStream(ctx, IDPush)
if err == errProtocolNotSupported {
log.Debugw("not sending push as peer does not support protocol", "peer", ph.pid)
return nil
Expand All @@ -112,14 +112,13 @@ func (ph *peerHandler) sendPush(ctx context.Context) error {
_ = dp.Reset()
return fmt.Errorf("failed to send push message: %w", err)
}

return nil
}

func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network.Stream, error) {
func (ph *peerHandler) openStream(ctx context.Context, proto string) (network.Stream, error) {
// wait for the other peer to send us an Identify response on "all" connections we have with it
// so we can look at it's supported protocols and avoid a multistream-select roundtrip to negotiate the protocol
// if we know for a fact that it dosen't support the protocol.
// if we know for a fact that it doesn't support the protocol.
conns := ph.ids.Host.Network().ConnsToPeer(ph.pid)
for _, c := range conns {
select {
Expand All @@ -129,45 +128,16 @@ func (ph *peerHandler) openStream(ctx context.Context, protos []string) (network
}
}

if !ph.peerSupportsProtos(ctx, protos) {
if sup, err := ph.ids.Host.Peerstore().SupportsProtocols(ph.pid, proto); err != nil || len(sup) == 0 {
return nil, errProtocolNotSupported
}

ph.ids.pushSemaphore <- struct{}{}
defer func() {
<-ph.ids.pushSemaphore
}()

// negotiate a stream without opening a new connection as we "should" already have a connection.
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
ctx, cancel := context.WithTimeout(network.WithNoDial(ctx, "should already have connection"), 30*time.Second)
defer cancel()
ctx = network.WithNoDial(ctx, "should already have connection")

// newstream will open a stream on the first protocol the remote peer supports from the among
// the list of protocols passed to it.
s, err := ph.ids.Host.NewStream(ctx, ph.pid, protocol.ConvertFromStrings(protos)...)
if err != nil {
return nil, err
}

return s, err
}

// returns true if the peer supports atleast one of the given protocols
func (ph *peerHandler) peerSupportsProtos(ctx context.Context, protos []string) bool {
conns := ph.ids.Host.Network().ConnsToPeer(ph.pid)
for _, c := range conns {
select {
case <-ph.ids.IdentifyWait(c):
case <-ctx.Done():
return false
}
}

pstore := ph.ids.Host.Peerstore()

if sup, err := pstore.SupportsProtocols(ph.pid, protos...); err == nil && len(sup) == 0 {
return false
}
return true
return ph.ids.Host.NewStream(ctx, ph.pid, protocol.ID(proto))
}
21 changes: 0 additions & 21 deletions p2p/protocol/identify/peer_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p/core/peer"
blhost "github.com/libp2p/go-libp2p/p2p/host/blank"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"

Expand Down Expand Up @@ -40,23 +39,3 @@ func TestHandlerClose(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}
}

func TestPeerSupportsProto(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

h1 := blhost.NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
ids1, err := NewIDService(h1)
require.NoError(t, err)

rp := peer.ID("test")
ph := newPeerHandler(rp, ids1)
require.NoError(t, h1.Peerstore().AddProtocols(rp, "test"))
require.True(t, ph.peerSupportsProtos(ctx, []string{"test"}))
require.False(t, ph.peerSupportsProtos(ctx, []string{"random"}))

// remove support for protocol and check
require.NoError(t, h1.Peerstore().RemoveProtocols(rp, "test"))
require.False(t, ph.peerSupportsProtos(ctx, []string{"test"}))
}

0 comments on commit 56078a1

Please sign in to comment.