@@ -5049,3 +5049,187 @@ func TestSkipPublishingToPeersRequestingPartialMessages(t *testing.T) {
50495049 case <- time .After (2 * time .Second ):
50505050 }
50515051}
5052+
5053+ func TestPairwiseInteractionWithPartialMessages (t * testing.T ) {
5054+ type PartialMessageStatus int
5055+ const (
5056+ NoPartialMessages PartialMessageStatus = iota
5057+ PeerSupportsPartialMessages
5058+ PeerRequestsPartialMessages
5059+ )
5060+
5061+ type TestCase struct {
5062+ hostSupport []PartialMessageStatus
5063+ publisherIdx int
5064+ }
5065+
5066+ var tcs []TestCase
5067+ for _ , a := range []PartialMessageStatus {NoPartialMessages , PeerSupportsPartialMessages , PeerRequestsPartialMessages } {
5068+ for _ , b := range []PartialMessageStatus {NoPartialMessages , PeerSupportsPartialMessages , PeerRequestsPartialMessages } {
5069+ for i := range 2 {
5070+ tcs = append (tcs , TestCase {hostSupport : []PartialMessageStatus {a , b }, publisherIdx : i })
5071+ }
5072+ }
5073+ }
5074+
5075+ for _ , tc := range tcs {
5076+ t .Run (fmt .Sprintf ("Host Support: %v. Publisher: %d" , tc .hostSupport , tc .publisherIdx ), func (t * testing.T ) {
5077+ topic := "test-topic"
5078+ hostCount := len (tc .hostSupport )
5079+ hosts := getDefaultHosts (t , hostCount )
5080+ topics := make ([]* Topic , 0 , len (hosts ))
5081+ psubs := make ([]* PubSub , 0 , len (hosts ))
5082+
5083+ gossipsubCtx , closeGossipsub := context .WithCancel (context .Background ())
5084+ defer closeGossipsub ()
5085+ go func () {
5086+ <- gossipsubCtx .Done ()
5087+ for _ , h := range hosts {
5088+ h .Close ()
5089+ }
5090+ }()
5091+
5092+ partialExt := make ([]* partialmessages.PartialMessageExtension , hostCount )
5093+ logger := slog .New (slog .NewTextHandler (os .Stderr , & slog.HandlerOptions {Level : slog .LevelDebug }))
5094+
5095+ // A list of maps from topic+groupID to partialMessage. One map per peer
5096+ // var partialMessageStoreMu sync.Mutex
5097+ partialMessageStore := make ([]map [string ]* minimalTestPartialMessage , hostCount )
5098+ for i := range hostCount {
5099+ partialMessageStore [i ] = make (map [string ]* minimalTestPartialMessage )
5100+ }
5101+
5102+ receivedMessage := make (chan struct {}, hostCount )
5103+
5104+ for i := range partialExt {
5105+ if tc .hostSupport [i ] == NoPartialMessages {
5106+ continue
5107+ }
5108+ partialExt [i ] = & partialmessages.PartialMessageExtension {
5109+ Logger : logger .With ("id" , i ),
5110+ ValidateRPC : func (from peer.ID , rpc * pb.PartialMessagesExtension ) error {
5111+ // No validation. Only for this test. In production you should
5112+ // have some basic fast rules here.
5113+ return nil
5114+ },
5115+ MergePartsMetadata : func (_ string , left , right partialmessages.PartsMetadata ) partialmessages.PartsMetadata {
5116+ return partialmessages .MergeBitmap (left , right )
5117+ },
5118+ OnIncomingRPC : func (from peer.ID , rpc * pb.PartialMessagesExtension ) error {
5119+ if tc .hostSupport [i ] == PeerSupportsPartialMessages && len (rpc .PartialMessage ) > 0 {
5120+ panic ("This host should not have received partial message data" )
5121+ }
5122+
5123+ groupID := rpc .GroupID
5124+ pm , ok := partialMessageStore [i ][topic + string (groupID )]
5125+ if ! ok {
5126+ pm = & minimalTestPartialMessage {
5127+ Group : groupID ,
5128+ }
5129+ partialMessageStore [i ][topic + string (groupID )] = pm
5130+ }
5131+ prevComplete := pm .complete ()
5132+ if publishOpts := pm .onIncomingRPC (from , rpc ); publishOpts != nil {
5133+ if ! prevComplete && pm .complete () {
5134+ t .Log ("host" , i , "received partial message" )
5135+
5136+ receivedMessage <- struct {}{}
5137+ }
5138+
5139+ go psubs [i ].PublishPartialMessage (topic , pm , * publishOpts )
5140+ }
5141+ return nil
5142+ },
5143+ }
5144+ }
5145+
5146+ for i , h := range hosts {
5147+ var opts []Option
5148+ var topicOpts []TopicOpt
5149+ switch tc .hostSupport [i ] {
5150+ case NoPartialMessages :
5151+ case PeerSupportsPartialMessages :
5152+ opts = append (opts , WithPartialMessagesExtension (partialExt [i ]))
5153+ topicOpts = append (topicOpts , SupportsPartialMessages ())
5154+ case PeerRequestsPartialMessages :
5155+ opts = append (opts , WithPartialMessagesExtension (partialExt [i ]))
5156+ topicOpts = append (topicOpts , RequestPartialMessages ())
5157+ }
5158+
5159+ psub := getGossipsub (gossipsubCtx , h , opts ... )
5160+ topic , err := psub .Join (topic , topicOpts ... )
5161+ if err != nil {
5162+ t .Fatal (err )
5163+ }
5164+ topics = append (topics , topic )
5165+ sub , err := topic .Subscribe ()
5166+ if err != nil {
5167+ t .Fatal (err )
5168+ }
5169+ psubs = append (psubs , psub )
5170+ go func () {
5171+ _ , err := sub .Next (gossipsubCtx )
5172+ if err == context .Canceled {
5173+ return
5174+ }
5175+ if err != nil {
5176+ t .Fatal (err )
5177+ }
5178+
5179+ t .Log ("host" , i , "received message" )
5180+ receivedMessage <- struct {}{}
5181+ }()
5182+ }
5183+
5184+ denseConnect (t , hosts )
5185+ time .Sleep (time .Second )
5186+
5187+ group := []byte ("test-group" )
5188+ msg1 := & minimalTestPartialMessage {
5189+ Group : group ,
5190+ Parts : [2 ][]byte {
5191+ []byte ("Hello" ),
5192+ []byte ("World" ),
5193+ },
5194+ }
5195+
5196+ for i := range hostCount {
5197+ if i != tc .publisherIdx {
5198+ continue
5199+ }
5200+
5201+ partialMessageStore [i ][topic + string (group )] = msg1
5202+
5203+ encoded , err := msg1 .PartialMessageBytes (partialmessages .PartsMetadata ([]byte {0 }))
5204+ if err != nil {
5205+ t .Fatal (err )
5206+ }
5207+ err = topics [i ].Publish (context .Background (), encoded )
5208+ if err != nil {
5209+ t .Fatal (err )
5210+ }
5211+
5212+ if tc .hostSupport [i ] != NoPartialMessages {
5213+ err = psubs [i ].PublishPartialMessage (topic , msg1 , partialmessages.PublishOptions {})
5214+ if err != nil {
5215+ t .Fatal (err )
5216+ }
5217+ }
5218+ }
5219+
5220+ for range hostCount {
5221+ select {
5222+ case <- receivedMessage :
5223+ case <- time .After (time .Second ):
5224+ t .Fatalf ("At least one message was not received" )
5225+ }
5226+ }
5227+
5228+ select {
5229+ case <- receivedMessage :
5230+ t .Fatalf ("An extra message was received" )
5231+ case <- time .After (100 * time .Millisecond ):
5232+ }
5233+ })
5234+ }
5235+ }
0 commit comments