Skip to content

Commit 96339bb

Browse files
committed
add Topic option to skip publishing to partial message-capable peers
1 parent 109d288 commit 96339bb

File tree

4 files changed

+148
-13
lines changed

4 files changed

+148
-13
lines changed

gossipsub.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,6 +1295,13 @@ func (gs *GossipSubRouter) PublishBatch(messages []*Message, opts *BatchPublishO
12951295
}
12961296
}
12971297

1298+
func (gs *GossipSubRouter) skipPartialMessageCapablePeers(topicID string) bool {
1299+
if t, ok := gs.p.myTopics[topicID]; ok {
1300+
return t.skipPublishingToPartialMessageCapablePeers
1301+
}
1302+
return false
1303+
}
1304+
12981305
func (gs *GossipSubRouter) Publish(msg *Message) {
12991306
for p, rpc := range gs.rpcs(msg) {
13001307
gs.sendRPC(p, rpc, false)
@@ -1371,8 +1378,9 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
13711378
}
13721379

13731380
out := rpcWithMessages(msg.Message)
1381+
skipPMPeers := gs.skipPartialMessageCapablePeers(msg.GetTopic())
13741382
for pid := range tosend {
1375-
if pid == from || pid == peer.ID(msg.GetFrom()) {
1383+
if pid == from || pid == peer.ID(msg.GetFrom()) || (skipPMPeers && gs.extensions.peerExtensions[pid].PartialMessages) {
13761384
continue
13771385
}
13781386

gossipsub_test.go

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,6 @@ func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSu
6262
return psubs
6363
}
6464

65-
func getGossipsubsOptFn(ctx context.Context, hs []host.Host, optFn func(int, host.Host) []Option) []*PubSub {
66-
var psubs []*PubSub
67-
for i, h := range hs {
68-
opts := optFn(i, h)
69-
psubs = append(psubs, getGossipsub(ctx, h, opts...))
70-
}
71-
return psubs
72-
}
73-
7465
func TestGossipSubParamsValidate(t *testing.T) {
7566
params := DefaultGossipSubParams()
7667
params.Dhi = 1
@@ -92,6 +83,15 @@ func TestGossipSubBootstrapParamsValidate(t *testing.T) {
9283
}
9384
}
9485

86+
func getGossipsubsOptFn(ctx context.Context, hs []host.Host, optFn func(int, host.Host) []Option) []*PubSub {
87+
var psubs []*PubSub
88+
for i, h := range hs {
89+
opts := optFn(i, h)
90+
psubs = append(psubs, getGossipsub(ctx, h, opts...))
91+
}
92+
return psubs
93+
}
94+
9595
func TestSparseGossipsub(t *testing.T) {
9696
ctx, cancel := context.WithCancel(context.Background())
9797
defer cancel()
@@ -4685,3 +4685,112 @@ func TestPartialMessages(t *testing.T) {
46854685
}
46864686
}
46874687
}
4688+
4689+
func TestSkipPublishingToPeersWithPartialMessageSupport(t *testing.T) {
4690+
topicName := "test-topic"
4691+
4692+
// 3 hosts.
4693+
// hosts[0]: Publisher. Supports partial messages
4694+
// hosts[1]: Subscriber. Supports partial messages
4695+
// hosts[2]: Alternate publisher. Does not support partial messages. Only
4696+
// connected to hosts[0]
4697+
hosts := getDefaultHosts(t, 3)
4698+
4699+
const hostsWithPartialMessageSupport = 2
4700+
partialExt := make([]*partialmessages.PartialMessageExtension, hostsWithPartialMessageSupport)
4701+
// A list of maps from topic+groupID to partialMessage. One map per peer
4702+
partialMessageStore := make([]map[string]*minimalTestPartialMessage, hostsWithPartialMessageSupport)
4703+
for i := range hostsWithPartialMessageSupport {
4704+
partialMessageStore[i] = make(map[string]*minimalTestPartialMessage)
4705+
}
4706+
4707+
// Only hosts with partial message support
4708+
psubs := make([]*PubSub, 0, len(hosts)-1)
4709+
4710+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
4711+
4712+
for i := range partialExt {
4713+
partialExt[i] = &partialmessages.PartialMessageExtension{
4714+
Logger: logger,
4715+
ValidateRPC: func(from peer.ID, rpc *pb.PartialMessagesExtension) error {
4716+
return nil
4717+
},
4718+
OnIncomingRPC: func(from peer.ID, topic string, groupID, iwant, ihave, partialMessageBytes []byte) {
4719+
pm, ok := partialMessageStore[i][topic+string(groupID)]
4720+
if !ok {
4721+
pm = &minimalTestPartialMessage{
4722+
Group: groupID,
4723+
}
4724+
partialMessageStore[i][topic+string(groupID)] = pm
4725+
}
4726+
if publishOpts := pm.onIncomingRPC(from, topic, groupID, iwant, ihave, partialMessageBytes); publishOpts != nil {
4727+
go psubs[i].PublishPartialMessage(topic, pm, *publishOpts)
4728+
}
4729+
},
4730+
}
4731+
}
4732+
4733+
for i, h := range hosts[:2] {
4734+
psub := getGossipsub(context.Background(), h, WithPartialMessagesExtension(partialExt[i]))
4735+
psubs = append(psubs, psub)
4736+
}
4737+
4738+
nonPartialPubsub := getGossipsub(context.Background(), hosts[2])
4739+
4740+
denseConnect(t, hosts[:2])
4741+
time.Sleep(2 * time.Second)
4742+
4743+
// Connect nonPartialPubsub to the publisher
4744+
connect(t, hosts[0], hosts[2])
4745+
4746+
var topics []*Topic
4747+
var subs []*Subscription
4748+
for _, psub := range psubs {
4749+
topic, err := psub.Join(topicName, WithSkipPublishingToPartialMessageCapablePeers())
4750+
if err != nil {
4751+
t.Fatal(err)
4752+
}
4753+
topics = append(topics, topic)
4754+
s, err := topic.Subscribe()
4755+
if err != nil {
4756+
t.Fatal(err)
4757+
}
4758+
subs = append(subs, s)
4759+
}
4760+
4761+
topicForNonPartial, err := nonPartialPubsub.Join(topicName)
4762+
if err != nil {
4763+
t.Fatal(err)
4764+
}
4765+
4766+
// Wait for subscriptions to propagate
4767+
time.Sleep(time.Second)
4768+
4769+
topics[0].Publish(context.Background(), []byte("Hello"))
4770+
4771+
// Publish from another peer, the publisher (psub[0]) should not forward this to psub[1].
4772+
topicForNonPartial.Publish(context.Background(), []byte("from non-partial"))
4773+
4774+
recvdMessage := make(chan struct{}, 1)
4775+
ctx, cancel := context.WithCancel(context.Background())
4776+
defer cancel()
4777+
go func() {
4778+
msg, err := subs[1].Next(ctx)
4779+
if err == context.Canceled {
4780+
return
4781+
}
4782+
if err != nil {
4783+
t.Log(err)
4784+
t.Fail()
4785+
return
4786+
}
4787+
t.Log("Received msg", string(msg.Data))
4788+
recvdMessage <- struct{}{}
4789+
}()
4790+
4791+
select {
4792+
case <-recvdMessage:
4793+
t.Fatal("Received message")
4794+
case <-time.After(2 * time.Second):
4795+
}
4796+
}

pubsub.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
13761376
continue
13771377
}
13781378

1379-
msg := &Message{pmsg, "", rpc.from, nil, false}
1379+
msg := &Message{Message: pmsg, ID: "", ReceivedFrom: rpc.from, ValidatorData: nil, Local: false}
13801380
if p.shouldPush(msg) {
13811381
toPush = append(toPush, msg)
13821382
}
@@ -1515,7 +1515,16 @@ type rmTopicReq struct {
15151515
resp chan error
15161516
}
15171517

1518-
type TopicOptions struct{}
1518+
type TopicOptions struct {
1519+
SkipPublishingToPartialMessageCapablePeers bool
1520+
}
1521+
1522+
func WithSkipPublishingToPartialMessageCapablePeers() TopicOpt {
1523+
return func(t *Topic) error {
1524+
t.skipPublishingToPartialMessageCapablePeers = true
1525+
return nil
1526+
}
1527+
}
15191528

15201529
type TopicOpt func(t *Topic) error
15211530

topic.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ type Topic struct {
3232

3333
mux sync.RWMutex
3434
closed bool
35+
36+
skipPublishingToPartialMessageCapablePeers bool
3537
}
3638

3739
// String returns the topic associated with t
@@ -348,7 +350,14 @@ func (t *Topic) validate(ctx context.Context, data []byte, opts ...PubOpt) (*Mes
348350
}
349351
}
350352

351-
msg := &Message{m, "", t.p.host.ID(), pub.validatorData, pub.local}
353+
msg := &Message{
354+
Message: m,
355+
ID: "",
356+
ReceivedFrom: t.p.host.ID(),
357+
ValidatorData: pub.validatorData,
358+
Local: pub.local,
359+
}
360+
352361
select {
353362
case t.p.eval <- func() {
354363
t.p.rt.Preprocess(t.p.host.ID(), []*Message{msg})

0 commit comments

Comments
 (0)