Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced topic lock by atomic boolean to avoid lock contention #495

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ require (
github.com/libp2p/go-msgio v0.2.0
github.com/multiformats/go-multiaddr v0.5.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
go.uber.org/atomic v1.7.0
)
2 changes: 1 addition & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ type Message struct {
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
Local bool
Local bool
}

func (m *Message) GetFrom() peer.ID {
Expand Down
37 changes: 11 additions & 26 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/atomic"
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
Expand All @@ -30,8 +31,7 @@ type Topic struct {
evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}

mux sync.RWMutex
closed bool
closed atomic.Bool
}

// String returns the topic associated with t
Expand All @@ -47,10 +47,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
return fmt.Errorf("invalid topic score parameters: %w", err)
}

t.mux.Lock()
defer t.mux.Unlock()

if t.closed {
if t.closed.Load() {
return ErrTopicClosed
}

Expand Down Expand Up @@ -84,9 +81,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -141,9 +136,7 @@ func (t *Topic) sendNotification(evt PeerEvent) {
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -184,9 +177,7 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
// cancel function. Subsequent calls increase the reference counter.
// To completely disable the relay, all references must be cancelled.
func (t *Topic) Relay() (RelayCancelFunc, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -215,16 +206,14 @@ type ProvideKey func() (crypto.PrivKey, peer.ID)
type PublishOptions struct {
ready RouterReady
customKey ProvideKey
local bool
local bool
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}

type PubOpt func(pub *PublishOptions) error

// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return ErrTopicClosed
}

Expand Down Expand Up @@ -347,9 +336,7 @@ func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
if t.closed.Load() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race here: you want to do Swap here and if old result was false then proceed with closing.

Otherwise 2 goroutines can hit Close, load false and proceed.

return nil
}

Expand All @@ -364,17 +351,15 @@ func (t *Topic) Close() error {
err := <-req.resp

if err == nil {
t.closed = true
t.closed.Swap(true)
}

return err
}

// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return []peer.ID{}
}

Expand Down