@@ -3338,7 +3338,7 @@ func TestGossipsubPruneMeshCorrectly(t *testing.T) {
33383338 }
33393339}
33403340
3341- // Test that IANNOUNCE is sent to mesh peers
3341+ // Test that IANNOUNCE is sent to mesh peers and no message is sent if it doesn't send INEED
33423342func TestGossipsubIannounceMeshPeer (t * testing.T ) {
33433343 ctx , cancel := context .WithCancel (context .Background ())
33443344 defer cancel ()
@@ -3421,6 +3421,89 @@ func TestGossipsubIannounceMeshPeer(t *testing.T) {
34213421 <- ctx .Done ()
34223422}
34233423
3424+ // Test that IANNOUNCE is sent to mesh peers and the message is sent after sending INEED
3425+ func TestGossipsubIannounceIneedMeshPeer (t * testing.T ) {
3426+ ctx , cancel := context .WithCancel (context .Background ())
3427+ defer cancel ()
3428+ hosts := getDefaultHosts (t , 2 )
3429+
3430+ msgID := func (pmsg * pb.Message ) string {
3431+ // silly content-based test message-ID: just use the data as whole
3432+ return base64 .URLEncoding .EncodeToString (pmsg .Data )
3433+ }
3434+
3435+ params := DefaultGossipSubParams ()
3436+ params .Dannounce = params .D
3437+ psub := getGossipsub (ctx , hosts [0 ], WithGossipSubParams (params ), WithMessageIdFn (msgID ))
3438+ _ , err := psub .Subscribe ("foobar" )
3439+ if err != nil {
3440+ t .Fatal (err )
3441+ }
3442+
3443+ // Wait a bit after the last message before checking we got the right messages
3444+ msgTimer := time .NewTimer (1 * time .Second )
3445+
3446+ // Checks we received the right messages
3447+ msgCount := 0
3448+ checkMsgs := func () {
3449+ if msgCount != 1 {
3450+ t .Fatalf ("Expected one message received, got %d" , msgCount )
3451+ }
3452+ }
3453+
3454+ // Wait for the timer to expire
3455+ go func () {
3456+ select {
3457+ case <- msgTimer .C :
3458+ checkMsgs ()
3459+ cancel ()
3460+ return
3461+ case <- ctx .Done ():
3462+ checkMsgs ()
3463+ }
3464+ }()
3465+
3466+ newMockGS (ctx , t , hosts [1 ], func (writeMsg func (* pb.RPC ), irpc * pb.RPC ) {
3467+ // When the first peer connects it will send us its subscriptions
3468+ for _ , sub := range irpc .GetSubscriptions () {
3469+ if sub .GetSubscribe () {
3470+ // Reply by subcribing to the topic and grafting to the first peer
3471+ writeMsg (& pb.RPC {
3472+ Subscriptions : []* pb.RPC_SubOpts {{Subscribe : sub .Subscribe , Topicid : sub .Topicid }},
3473+ Control : & pb.ControlMessage {Graft : []* pb.ControlGraft {{TopicID : sub .Topicid }}},
3474+ })
3475+
3476+ go func () {
3477+ // Wait for a short interval to make sure the first peer
3478+ // received and processed the subscribe + graft
3479+ time .Sleep (100 * time .Millisecond )
3480+ // Publish messages from the first peer
3481+ data := []byte ("mymessage" )
3482+ psub .Publish ("foobar" , data )
3483+ }()
3484+ }
3485+ }
3486+ if len (irpc .GetControl ().GetIannounce ()) > 0 {
3487+ var ineeds []* pb.ControlINeed
3488+ for _ , iannounce := range irpc .GetControl ().GetIannounce () {
3489+ mid := iannounce .GetMessageID ()
3490+ ineed := & pb.ControlINeed {
3491+ MessageID : & mid ,
3492+ }
3493+ ineeds = append (ineeds , ineed )
3494+ }
3495+ writeMsg (& pb.RPC {
3496+ Control : & pb.ControlMessage {Ineed : ineeds },
3497+ })
3498+ }
3499+ msgCount += len (irpc .GetPublish ())
3500+ })
3501+
3502+ connect (t , hosts [0 ], hosts [1 ])
3503+
3504+ <- ctx .Done ()
3505+ }
3506+
34243507// Test that IANNOUNCE is sent to direct peers
34253508func TestGossipsubIannounceDirectPeer (t * testing.T ) {
34263509 ctx , cancel := context .WithCancel (context .Background ())
@@ -4018,6 +4101,92 @@ func TestGossipsubIneedIndirectNonmeshPeers(t *testing.T) {
40184101 <- ctx .Done ()
40194102}
40204103
4104+ func TestSparseGossipsubV2 (t * testing.T ) {
4105+ ctx , cancel := context .WithCancel (context .Background ())
4106+ defer cancel ()
4107+ hosts := getDefaultHosts (t , 20 )
4108+
4109+ params := DefaultGossipSubParams ()
4110+ params .Dannounce = params .D
4111+ psubs := getGossipsubs (ctx , hosts , WithGossipSubParams (params ))
4112+
4113+ var msgs []* Subscription
4114+ for _ , ps := range psubs {
4115+ subch , err := ps .Subscribe ("foobar" )
4116+ if err != nil {
4117+ t .Fatal (err )
4118+ }
4119+
4120+ msgs = append (msgs , subch )
4121+ }
4122+
4123+ sparseConnect (t , hosts )
4124+
4125+ // wait for heartbeats to build mesh
4126+ time .Sleep (time .Second * 2 )
4127+
4128+ for i := 0 ; i < 100 ; i ++ {
4129+ msg := []byte (fmt .Sprintf ("%d it's not a floooooood %d" , i , i ))
4130+
4131+ owner := mrand .Intn (len (psubs ))
4132+
4133+ psubs [owner ].Publish ("foobar" , msg )
4134+
4135+ for _ , sub := range msgs {
4136+ got , err := sub .Next (ctx )
4137+ if err != nil {
4138+ t .Fatal (sub .err )
4139+ }
4140+ if ! bytes .Equal (msg , got .Data ) {
4141+ t .Fatal ("got wrong message!" )
4142+ }
4143+ }
4144+ }
4145+ }
4146+
4147+ func TestDenseGossipsubV2 (t * testing.T ) {
4148+ ctx , cancel := context .WithCancel (context .Background ())
4149+ defer cancel ()
4150+ hosts := getDefaultHosts (t , 20 )
4151+
4152+ params := DefaultGossipSubParams ()
4153+ params .Dannounce = params .D
4154+ psubs := getGossipsubs (ctx , hosts , WithGossipSubParams (params ))
4155+
4156+ var msgs []* Subscription
4157+ for _ , ps := range psubs {
4158+ subch , err := ps .Subscribe ("foobar" )
4159+ if err != nil {
4160+ t .Fatal (err )
4161+ }
4162+
4163+ msgs = append (msgs , subch )
4164+ }
4165+
4166+ denseConnect (t , hosts )
4167+
4168+ // wait for heartbeats to build mesh
4169+ time .Sleep (time .Second * 2 )
4170+
4171+ for i := 0 ; i < 100 ; i ++ {
4172+ msg := []byte (fmt .Sprintf ("%d it's not a floooooood %d" , i , i ))
4173+
4174+ owner := mrand .Intn (len (psubs ))
4175+
4176+ psubs [owner ].Publish ("foobar" , msg )
4177+
4178+ for _ , sub := range msgs {
4179+ got , err := sub .Next (ctx )
4180+ if err != nil {
4181+ t .Fatal (sub .err )
4182+ }
4183+ if ! bytes .Equal (msg , got .Data ) {
4184+ t .Fatal ("got wrong message!" )
4185+ }
4186+ }
4187+ }
4188+ }
4189+
40214190func BenchmarkAllocDoDropRPC (b * testing.B ) {
40224191 gs := GossipSubRouter {tracer : & pubsubTracer {}}
40234192
0 commit comments