@@ -63,15 +63,6 @@ func getGossipsubs(ctx context.Context, hs []host.Host, opts ...Option) []*PubSu
6363 return psubs
6464}
6565
66- func getGossipsubsOptFn (ctx context.Context , hs []host.Host , optFn func (int , host.Host ) []Option ) []* PubSub {
67- var psubs []* PubSub
68- for i , h := range hs {
69- opts := optFn (i , h )
70- psubs = append (psubs , getGossipsub (ctx , h , opts ... ))
71- }
72- return psubs
73- }
74-
7566func TestGossipSubParamsValidate (t * testing.T ) {
7667 params := DefaultGossipSubParams ()
7768 params .Dhi = 1
@@ -93,6 +84,15 @@ func TestGossipSubBootstrapParamsValidate(t *testing.T) {
9384 }
9485}
9586
87+ func getGossipsubsOptFn (ctx context.Context , hs []host.Host , optFn func (int , host.Host ) []Option ) []* PubSub {
88+ var psubs []* PubSub
89+ for i , h := range hs {
90+ opts := optFn (i , h )
91+ psubs = append (psubs , getGossipsub (ctx , h , opts ... ))
92+ }
93+ return psubs
94+ }
95+
9696func TestSparseGossipsub (t * testing.T ) {
9797 ctx , cancel := context .WithCancel (context .Background ())
9898 defer cancel ()
@@ -4653,3 +4653,101 @@ outer:
46534653 t .Errorf ("Expected no missing parts, got %v" , missing )
46544654 }
46554655}
4656+
4657+ func TestSkipPublishingToPeersWithPartialMessageSupport (t * testing.T ) {
4658+ topicName := "test-topic"
4659+
4660+ // 3 hosts.
4661+ // hosts[0]: Publisher. Supports partial messages
4662+ // hosts[1]: Subscriber. Supports partial messages
4663+ // hosts[2]: Alternate publisher. Does not support partial messages. Only
4664+ // connected to hosts[0]
4665+ hosts := getDefaultHosts (t , 3 )
4666+
4667+ partialExt := make ([]* partialmessages.PartialMessageExtension , 2 )
4668+ logger := slog .New (slog .NewTextHandler (os .Stderr , & slog.HandlerOptions {Level : slog .LevelDebug }))
4669+
4670+ for i := range partialExt {
4671+ partialExt [i ] = & partialmessages.PartialMessageExtension {
4672+ Logger : logger ,
4673+ ValidateRPC : func (from peer.ID , rpc * pb.PartialMessagesExtension ) error {
4674+ return nil
4675+ },
4676+ EagerIWantLimitPerHeartbeat : 0 ,
4677+ IWantLimitPerHeartbeat : 1 ,
4678+ NewPartialMessage : func (topic string , groupID []byte ) (partialmessages.PartialMessage , error ) {
4679+ return & minimalTestPartialMessage {
4680+ Group : groupID ,
4681+ onExtended : func (m * minimalTestPartialMessage ) {
4682+ t .Logf ("Received new part and extended partial message" )
4683+ },
4684+ }, nil
4685+ },
4686+ }
4687+ }
4688+
4689+ psubs := make ([]* PubSub , 0 , len (hosts )- 1 )
4690+ for i , h := range hosts [:2 ] {
4691+ psub := getGossipsub (context .Background (), h , WithPartialMessagesExtension (partialExt [i ]))
4692+ psubs = append (psubs , psub )
4693+ }
4694+
4695+ nonPartialPubsub := getGossipsub (context .Background (), hosts [2 ])
4696+
4697+ denseConnect (t , hosts [:2 ])
4698+ time .Sleep (2 * time .Second )
4699+
4700+ // Connect nonPartialPubsub to the publisher
4701+ connect (t , hosts [0 ], hosts [2 ])
4702+
4703+ var topics []* Topic
4704+ var subs []* Subscription
4705+ for _ , psub := range psubs {
4706+ topic , err := psub .Join (topicName , WithSkipPublishingToPartialMessageCapablePeers ())
4707+ if err != nil {
4708+ t .Fatal (err )
4709+ }
4710+ topics = append (topics , topic )
4711+ s , err := topic .Subscribe ()
4712+ if err != nil {
4713+ t .Fatal (err )
4714+ }
4715+ subs = append (subs , s )
4716+ }
4717+
4718+ topicForNonPartial , err := nonPartialPubsub .Join (topicName )
4719+ if err != nil {
4720+ t .Fatal (err )
4721+ }
4722+
4723+ // Wait for subscriptions to propagate
4724+ time .Sleep (time .Second )
4725+
4726+ topics [0 ].Publish (context .Background (), []byte ("Hello" ))
4727+
4728+ // Publish from another peer, the publisher (psub[0]) should not forward this to psub[1].
4729+ topicForNonPartial .Publish (context .Background (), []byte ("from non-partial" ))
4730+
4731+ recvdMessage := make (chan struct {}, 1 )
4732+ ctx , cancel := context .WithCancel (context .Background ())
4733+ defer cancel ()
4734+ go func () {
4735+ msg , err := subs [1 ].Next (ctx )
4736+ if err == context .Canceled {
4737+ return
4738+ }
4739+ if err != nil {
4740+ t .Log (err )
4741+ t .Fail ()
4742+ return
4743+ }
4744+ t .Log ("Received msg" , string (msg .Data ))
4745+ recvdMessage <- struct {}{}
4746+ }()
4747+
4748+ select {
4749+ case <- recvdMessage :
4750+ t .Fatal ("Received message" )
4751+ case <- time .After (2 * time .Second ):
4752+ }
4753+ }
0 commit comments