Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ require (
github.com/libp2p/go-msgio v0.2.0
github.com/multiformats/go-multiaddr v0.5.0
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
go.uber.org/atomic v1.7.0
)
37 changes: 11 additions & 26 deletions topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
"go.uber.org/atomic"
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
Expand All @@ -30,8 +31,7 @@ type Topic struct {
evtHandlerMux sync.RWMutex
evtHandlers map[*TopicEventHandler]struct{}

mux sync.RWMutex
closed bool
closed atomic.Bool
}

// String returns the topic associated with t
Expand All @@ -47,10 +47,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
return fmt.Errorf("invalid topic score parameters: %w", err)
}

t.mux.Lock()
defer t.mux.Unlock()

if t.closed {
if t.closed.Load() {
return ErrTopicClosed
}

Expand Down Expand Up @@ -84,9 +81,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -141,9 +136,7 @@ func (t *Topic) sendNotification(evt PeerEvent) {
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -184,9 +177,7 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
// cancel function. Subsequent calls increase the reference counter.
// To completely disable the relay, all references must be cancelled.
func (t *Topic) Relay() (RelayCancelFunc, error) {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return nil, ErrTopicClosed
}

Expand Down Expand Up @@ -215,16 +206,14 @@ type ProvideKey func() (crypto.PrivKey, peer.ID)
type PublishOptions struct {
ready RouterReady
customKey ProvideKey
local bool
local bool
}

type PubOpt func(pub *PublishOptions) error

// Publish publishes data to topic.
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return ErrTopicClosed
}

Expand Down Expand Up @@ -347,9 +336,7 @@ func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func (t *Topic) Close() error {
t.mux.Lock()
defer t.mux.Unlock()
if t.closed {
if t.closed.Load() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a race here: you want to do Swap here and if old result was false then proceed with closing.

Otherwise 2 goroutines can hit Close, load false and proceed.

return nil
}

Expand All @@ -364,17 +351,15 @@ func (t *Topic) Close() error {
err := <-req.resp

if err == nil {
t.closed = true
t.closed.Swap(true)
}

return err
}

// ListPeers returns a list of peers we are connected to in the given topic.
func (t *Topic) ListPeers() []peer.ID {
t.mux.RLock()
defer t.mux.RUnlock()
if t.closed {
if t.closed.Load() {
return []peer.ID{}
}

Expand Down