Skip to content

Commit a306e04

Browse files
committed
add test for SupportsPartial subscribe option
Creates a mixed network where only one peer request partial, and the rest either support or don't
1 parent 7dc23ae commit a306e04

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

floodsub_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ func denseConnect(t *testing.T, hosts []host.Host) {
5757
connectSome(t, hosts, 10)
5858
}
5959

60+
func ringConnect(t *testing.T, hosts []host.Host) {
61+
for i := range hosts {
62+
connect(t, hosts[i], hosts[(i+1)%len(hosts)])
63+
}
64+
}
65+
6066
func connectSome(t *testing.T, hosts []host.Host, d int) {
6167
for i, a := range hosts {
6268
for j := 0; j < d; j++ {

gossipsub_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4730,6 +4730,176 @@ func TestPartialMessages(t *testing.T) {
47304730
}
47314731
}
47324732

4733+
func TestPeerSupportsPartialMessages(t *testing.T) {
4734+
// First peer requests partial messages.
4735+
// other peers supports it.
4736+
// all should end with the full message.
4737+
4738+
topic := "test-topic"
4739+
const hostCount = 5
4740+
hosts := getDefaultHosts(t, hostCount)
4741+
psubs := make([]*PubSub, 0, len(hosts))
4742+
topics := make([]*Topic, 0, len(hosts))
4743+
subs := make([]*Subscription, 0, len(hosts))
4744+
4745+
gossipsubCtx, closeGossipsub := context.WithCancel(context.Background())
4746+
go func() {
4747+
<-gossipsubCtx.Done()
4748+
for _, h := range hosts {
4749+
h.Close()
4750+
}
4751+
}()
4752+
4753+
partialExt := make([]*partialmessages.PartialMessageExtension, hostCount)
4754+
logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug}))
4755+
4756+
// A list of maps from topic+groupID to partialMessage. One map per peer
4757+
// var partialMessageStoreMu sync.Mutex
4758+
partialMessageStore := make([]map[string]*minimalTestPartialMessage, hostCount)
4759+
for i := range hostCount {
4760+
partialMessageStore[i] = make(map[string]*minimalTestPartialMessage)
4761+
}
4762+
4763+
for i := range partialExt {
4764+
partialExt[i] = &partialmessages.PartialMessageExtension{
4765+
Logger: logger.With("id", i),
4766+
ValidateRPC: func(from peer.ID, rpc *pb.PartialMessagesExtension) error {
4767+
// No validation. Only for this test. In production you should
4768+
// have some basic fast rules here.
4769+
return nil
4770+
},
4771+
MergePartsMetadata: func(_ string, left, right partialmessages.PartsMetadata) partialmessages.PartsMetadata {
4772+
return partialmessages.MergeBitmap(left, right)
4773+
},
4774+
OnIncomingRPC: func(from peer.ID, rpc *pb.PartialMessagesExtension) error {
4775+
groupID := rpc.GroupID
4776+
pm, ok := partialMessageStore[i][topic+string(groupID)]
4777+
if !ok {
4778+
pm = &minimalTestPartialMessage{
4779+
Group: groupID,
4780+
}
4781+
partialMessageStore[i][topic+string(groupID)] = pm
4782+
}
4783+
if publishOpts := pm.onIncomingRPC(from, rpc); publishOpts != nil {
4784+
go psubs[i].PublishPartialMessage(topic, pm, *publishOpts)
4785+
if pm.complete() {
4786+
encoded, _ := pm.PartialMessageBytes(partialmessages.PartsMetadata([]byte{0}))
4787+
go func() {
4788+
err := psubs[i].Publish(topic, encoded)
4789+
if err != nil {
4790+
panic(err)
4791+
}
4792+
}()
4793+
}
4794+
4795+
}
4796+
return nil
4797+
},
4798+
}
4799+
}
4800+
4801+
for i, h := range hosts {
4802+
var topicOpts []TopicOpt
4803+
if i == 0 {
4804+
topicOpts = append(topicOpts, RequestPartialMessages())
4805+
} else if i == 1 {
4806+
// The right neighbor doesn't support partial messages
4807+
} else {
4808+
topicOpts = append(topicOpts, SupportsPartialMessages())
4809+
}
4810+
4811+
psub := getGossipsub(gossipsubCtx, h, WithPartialMessagesExtension(partialExt[i]))
4812+
topic, err := psub.Join(topic, topicOpts...)
4813+
if err != nil {
4814+
t.Fatal(err)
4815+
}
4816+
sub, err := topic.Subscribe()
4817+
if err != nil {
4818+
t.Fatal(err)
4819+
}
4820+
psubs = append(psubs, psub)
4821+
topics = append(topics, topic)
4822+
subs = append(subs, sub)
4823+
}
4824+
4825+
ringConnect(t, hosts)
4826+
time.Sleep(2 * time.Second)
4827+
4828+
group := []byte("test-group")
4829+
emptyMsg := &minimalTestPartialMessage{
4830+
Group: group,
4831+
}
4832+
fullMsg := &minimalTestPartialMessage{
4833+
Group: group,
4834+
Parts: [2][]byte{
4835+
[]byte("Hello"),
4836+
[]byte("World"),
4837+
},
4838+
}
4839+
4840+
for i := range hosts {
4841+
if i == 0 {
4842+
partialMessageStore[i][topic+string(group)] = emptyMsg
4843+
// first host has no data
4844+
err := psubs[i].PublishPartialMessage(topic, emptyMsg, partialmessages.PublishOptions{})
4845+
if err != nil {
4846+
t.Fatal(err)
4847+
}
4848+
} else {
4849+
if i != 1 {
4850+
copiedMsg := *fullMsg
4851+
partialMessageStore[i][topic+string(group)] = &copiedMsg
4852+
err := psubs[i].PublishPartialMessage(topic, &copiedMsg, partialmessages.PublishOptions{})
4853+
if err != nil {
4854+
t.Fatal(err)
4855+
}
4856+
}
4857+
encoded, err := fullMsg.PartialMessageBytes(partialmessages.PartsMetadata([]byte{0}))
4858+
if err != nil {
4859+
t.Fatal(err)
4860+
}
4861+
err = topics[i].Publish(context.Background(), encoded)
4862+
if err != nil {
4863+
t.Fatal(err)
4864+
}
4865+
}
4866+
}
4867+
4868+
time.Sleep(2 * time.Second)
4869+
4870+
// Close gossipsub before we inspect the state to avoid race conditions
4871+
closeGossipsub()
4872+
time.Sleep(1 * time.Second)
4873+
4874+
if len(partialMessageStore) != hostCount {
4875+
t.Errorf("One host is missing the partial message")
4876+
}
4877+
4878+
for i, msgStore := range partialMessageStore {
4879+
if i == 1 {
4880+
// Host 1 doesn't support partial messages
4881+
continue
4882+
}
4883+
if len(msgStore) == 0 {
4884+
t.Errorf("Host %d is missing the partial message", i)
4885+
}
4886+
for _, partialMessage := range msgStore {
4887+
if !partialMessage.complete() {
4888+
t.Errorf("expected complete message, but %v is incomplete at host %d", partialMessage, i)
4889+
}
4890+
}
4891+
}
4892+
4893+
for i, sub := range subs {
4894+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
4895+
defer cancel()
4896+
_, err := sub.Next(ctx)
4897+
if err != nil {
4898+
t.Errorf("failed to receive message: %v at host %d", err, i)
4899+
}
4900+
}
4901+
}
4902+
47334903
func TestSkipPublishingToPeersWithPartialMessageSupport(t *testing.T) {
47344904
topicName := "test-topic"
47354905

0 commit comments

Comments
 (0)