Skip to content

Commit

Permalink
Merge pull request #58 from libp2p/peer-connectedness
Browse files Browse the repository at this point in the history
emit the EvtPeerConnectednessChanged event
  • Loading branch information
marten-seemann authored Nov 30, 2021
2 parents 8b1829b + dd821e8 commit fcfa4cc
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 0 deletions.
5 changes: 5 additions & 0 deletions p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
if bh.emitters.evtLocalProtocolsUpdated, err = bh.eventbus.Emitter(&event.EvtLocalProtocolsUpdated{}); err != nil {
return nil
}
evtPeerConnectednessChanged, err := bh.eventbus.Emitter(&event.EvtPeerConnectednessChanged{})
if err != nil {
return nil
}
n.Notify(newPeerConnectWatcher(evtPeerConnectednessChanged))

n.SetStreamHandler(bh.newStreamHandler)

Expand Down
73 changes: 73 additions & 0 deletions p2p/host/blank/peer_connectedness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package blankhost

import (
"sync"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

ma "github.com/multiformats/go-multiaddr"
)

type peerConnectWatcher struct {
emitter event.Emitter

mutex sync.Mutex
connected map[peer.ID]struct{}
}

var _ network.Notifiee = &peerConnectWatcher{}

func newPeerConnectWatcher(emitter event.Emitter) *peerConnectWatcher {
return &peerConnectWatcher{
emitter: emitter,
connected: make(map[peer.ID]struct{}),
}
}

func (w *peerConnectWatcher) Listen(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) ListenClose(network.Network, ma.Multiaddr) {}
func (w *peerConnectWatcher) OpenedStream(network.Network, network.Stream) {}
func (w *peerConnectWatcher) ClosedStream(network.Network, network.Stream) {}

func (w *peerConnectWatcher) Connected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
w.handleTransition(p, n.Connectedness(p))
}

func (w *peerConnectWatcher) Disconnected(n network.Network, conn network.Conn) {
p := conn.RemotePeer()
w.handleTransition(p, n.Connectedness(p))
}

func (w *peerConnectWatcher) handleTransition(p peer.ID, state network.Connectedness) {
if changed := w.checkTransition(p, state); !changed {
return
}
w.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: state,
})
}

func (w *peerConnectWatcher) checkTransition(p peer.ID, state network.Connectedness) bool {
w.mutex.Lock()
defer w.mutex.Unlock()
switch state {
case network.Connected:
if _, ok := w.connected[p]; ok {
return false
}
w.connected[p] = struct{}{}
return true
case network.NotConnected:
if _, ok := w.connected[p]; ok {
delete(w.connected, p)
return true
}
return false
default:
return false
}
}
46 changes: 46 additions & 0 deletions p2p/host/blank/peer_connectedness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package blankhost

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"

"github.com/stretchr/testify/require"
)

func TestPeerConnectedness(t *testing.T) {
h1 := NewBlankHost(swarmt.GenSwarm(t))
defer h1.Close()
h2 := NewBlankHost(swarmt.GenSwarm(t))

sub1, err := h1.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
defer sub1.Close()
sub2, err := h2.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
defer sub2.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
require.NoError(t, h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}))
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h2.ID(),
Connectedness: network.Connected,
})
require.Equal(t, (<-sub2.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h1.ID(),
Connectedness: network.Connected,
})

// now close h2. This will disconnect it from h1.
require.NoError(t, h2.Close())
require.Equal(t, (<-sub1.Out()).(event.EvtPeerConnectednessChanged), event.EvtPeerConnectednessChanged{
Peer: h2.ID(),
Connectedness: network.NotConnected,
})
}

0 comments on commit fcfa4cc

Please sign in to comment.