Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 22 additions & 34 deletions gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4485,37 +4485,25 @@ func (m *minimalTestPartialMessage) extendFromEncodedPartialMessage(_ peer.ID, d

// onIncomingRPC handle an incoming rpc and will return a non-nil publish
// options if the caller should republish this partial message.
func (m *minimalTestPartialMessage) onIncomingRPC(from peer.ID, rpc *pb.PartialMessagesExtension) *partialmessages.PublishOptions {
var extended bool
if len(rpc.PartialMessage) > 0 {
extended = m.extendFromEncodedPartialMessage(from, rpc.PartialMessage)
func (m *minimalTestPartialMessage) onIncomingRPC(from peer.ID, rpc *pb.PartialMessagesExtension) bool {
if len(rpc.PartialMessage) > 0 && m.extendFromEncodedPartialMessage(from, rpc.PartialMessage) {
return true
}

var publishOpts partialmessages.PublishOptions

if !extended {
var peerHasUsefulData, iHaveUsefulData bool
// Only do these checks if we didn't extend our partial message.
// Since, otherwise, we simply publish again to all peers.
if len(rpc.PartsMetadata) > 0 {
iHave := m.PartsMetadata()[0]
iWant := ^iHave

peerHas := rpc.PartsMetadata[0]
peerWants := ^peerHas
var peerHasUsefulData, iHaveUsefulData bool
// Only do these checks if we didn't extend our partial message.
// Since, otherwise, we simply publish again to all peers.
if len(rpc.PartsMetadata) > 0 {
iHave := m.PartsMetadata()[0]
iWant := ^iHave

iHaveUsefulData = iHave&peerWants != 0
peerHasUsefulData = iWant&peerHas != 0
}
if peerHasUsefulData || iHaveUsefulData {
publishOpts.PublishToPeers = []peer.ID{from}
}
}
peerHas := rpc.PartsMetadata[0]
peerWants := ^peerHas

if extended || len(publishOpts.PublishToPeers) > 0 {
return &publishOpts
iHaveUsefulData = iHave&peerWants != 0
peerHasUsefulData = iWant&peerHas != 0
}
return nil
return peerHasUsefulData || iHaveUsefulData
}

// GroupID implements partialmessages.PartialMessage.
Expand Down Expand Up @@ -4670,8 +4658,8 @@ func TestPartialMessages(t *testing.T) {
}
partialMessageStore[i][topic+string(groupID)] = pm
}
if publishOpts := pm.onIncomingRPC(from, rpc); publishOpts != nil {
go psubs[i].PublishPartialMessage(topic, pm, *publishOpts)
if pm.onIncomingRPC(from, rpc) {
go psubs[i].PublishPartialMessage(topic, pm, partialmessages.PublishOptions{})
}
return nil
},
Expand Down Expand Up @@ -4799,8 +4787,8 @@ func TestPeerSupportsPartialMessages(t *testing.T) {
}
partialMessageStore[i][topic+string(groupID)] = pm
}
if publishOpts := pm.onIncomingRPC(from, rpc); publishOpts != nil {
go psubs[i].PublishPartialMessage(topic, pm, *publishOpts)
if pm.onIncomingRPC(from, rpc) {
go psubs[i].PublishPartialMessage(topic, pm, partialmessages.PublishOptions{})
if pm.complete() {
encoded, _ := pm.PartialMessageBytes(partialmessages.PartsMetadata([]byte{0}))
go func() {
Expand Down Expand Up @@ -4975,8 +4963,8 @@ func TestSkipPublishingToPeersRequestingPartialMessages(t *testing.T) {
}
partialMessageStore[i][topicID+string(groupID)] = pm
}
if publishOpts := pm.onIncomingRPC(from, rpc); publishOpts != nil {
go psubs[i].PublishPartialMessage(topicID, pm, *publishOpts)
if pm.onIncomingRPC(from, rpc) {
go psubs[i].PublishPartialMessage(topicID, pm, partialmessages.PublishOptions{})
}
return nil
},
Expand Down Expand Up @@ -5131,14 +5119,14 @@ func TestPairwiseInteractionWithPartialMessages(t *testing.T) {
partialMessageStore[i][topic+string(groupID)] = pm
}
prevComplete := pm.complete()
if publishOpts := pm.onIncomingRPC(from, rpc); publishOpts != nil {
if pm.onIncomingRPC(from, rpc) {
if !prevComplete && pm.complete() {
t.Log("host", i, "received partial message")

receivedMessage <- struct{}{}
}

go psubs[i].PublishPartialMessage(topic, pm, *publishOpts)
go psubs[i].PublishPartialMessage(topic, pm, partialmessages.PublishOptions{})
}
return nil
},
Expand Down
23 changes: 22 additions & 1 deletion partialmessages/partialmsgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message,
if len(opts.PublishToPeers) > 0 {
peers = slices.Values(opts.PublishToPeers)
} else {
peers = e.router.MeshPeers(topic)
peers = e.peersToPublishTo(topic, state)
}
for p := range peers {
log := e.Logger.With("peer", p)
Expand Down Expand Up @@ -286,6 +286,27 @@ func (e *PartialMessagesExtension) PublishPartial(topic string, partial Message,
return nil
}

// peersToPublishTo returns a iter.Seq of peers to publish to. It combines peers
// in the group state with mesh peers for the topic.
// Group state peers are used to cover the fanout and gossip message cases where
// the peer would not be in our mesh.
func (e *PartialMessagesExtension) peersToPublishTo(topic string, state *partialMessageStatePerGroupPerTopic) iter.Seq[peer.ID] {
return func(yield func(peer.ID) bool) {
for p := range state.peerState {
if !yield(p) {
return
}
}
for p := range e.router.MeshPeers(topic) {
if _, ok := state.peerState[p]; !ok {
if !yield(p) {
return
}
}
}
}
}

func (e *PartialMessagesExtension) AddPeer(id peer.ID) {
}

Expand Down
57 changes: 43 additions & 14 deletions partialmessages/partialmsgs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ type testPeers struct {
partialMessages map[peer.ID]map[string]map[string]*testPartialMessage
}

func createPeers(t *testing.T, topic string, n int) *testPeers {
func createPeers(t *testing.T, topic string, n int, nonMesh bool) *testPeers {
nw := &mockNetworkPartialMessages{
t: t,
pendingMsgs: make(map[peer.ID][]rpcWithFrom),
Expand Down Expand Up @@ -396,6 +396,11 @@ func createPeers(t *testing.T, topic string, n int) *testPeers {
},
meshPeers: func(topic string) iter.Seq[peer.ID] {
return func(yield func(peer.ID) bool) {
if nonMesh {
// No peers are in the mesh
return
}

// Yield all other peers
for j, otherPeer := range peers {
if j != i {
Expand Down Expand Up @@ -462,11 +467,7 @@ func createPeers(t *testing.T, topic string, n int) *testPeers {
weHaveUsefulData := peerWants.Cmp(&zeroInt) != 0

if weHaveUsefulData || peerHasUsefulData {
// This peer has something we want or we can provide
// something to them. Call publish partial just for them.
handler.PublishPartial(topic, pm, PublishOptions{
PublishToPeers: []peer.ID{from},
})
handler.PublishPartial(topic, pm, PublishOptions{})
}
return nil
},
Expand Down Expand Up @@ -579,7 +580,7 @@ func TestPartialMessages(t *testing.T) {
// slog.SetLogLoggerLevel(slog.LevelDebug)

t.Run("h1 has all the data. h2 requests it", func(t *testing.T) {
peers := createPeers(t, topic, 2)
peers := createPeers(t, topic, 2, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down Expand Up @@ -609,7 +610,7 @@ func TestPartialMessages(t *testing.T) {
})

t.Run("h1 has all the data and eager pushes. h2 has the next message also eager pushes", func(t *testing.T) {
peers := createPeers(t, topic, 2)
peers := createPeers(t, topic, 2, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down Expand Up @@ -670,7 +671,7 @@ func TestPartialMessages(t *testing.T) {
})

t.Run("h1 has all the data. h2 doesn't know anything", func(t *testing.T) {
peers := createPeers(t, topic, 2)
peers := createPeers(t, topic, 2, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand All @@ -696,8 +697,36 @@ func TestPartialMessages(t *testing.T) {
}
})

t.Run("h1 is not in h2's mesh, but sends h2 a partial message", func(t *testing.T) {
peers := createPeers(t, topic, 2, true)
peers.network.addPeers()
defer peers.network.removePeers()
slog.Debug("starting")

h1Msg, err := newFullTestMessage(rand, peers.handlers[0], topic)
if err != nil {
t.Fatal(err)
}
peers.registerMessage(0, topic, h1Msg)

// h1 knows the full message and explicitly publishes to peer 1 (e.g. fanout or gossip (like an IHAVE))
peers.handlers[0].PublishPartial(topic, h1Msg, PublishOptions{PublishToPeers: []peer.ID{peers.peers[1]}})

// Handle all RPCs
for peers.network.handleRPCs() {
}

// h2 should now have the partial message after receiving data
h2Msg := peers.getOrCreatePartialMessage(1, topic, h1Msg.Commitment)

// Assert h2 has the full message
if !h2Msg.complete() {
t.Fatal("h2 should have the full message")
}
})

t.Run("h1 has all the data. h2 has some of it", func(t *testing.T) {
peers := createPeers(t, topic, 2)
peers := createPeers(t, topic, 2, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down Expand Up @@ -749,7 +778,7 @@ func TestPartialMessages(t *testing.T) {
})

t.Run("h1 has half the data. h2 has the other half of it", func(t *testing.T) {
peers := createPeers(t, topic, 2)
peers := createPeers(t, topic, 2, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down Expand Up @@ -809,7 +838,7 @@ func TestPartialMessages(t *testing.T) {
})

t.Run("h1 and h2 have the the same half of data. No partial messages should be sent", func(t *testing.T) {
peers := createPeers(t, topic, 2)
peers := createPeers(t, topic, 2, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down Expand Up @@ -858,7 +887,7 @@ func TestPartialMessages(t *testing.T) {
})

t.Run("three peers with distributed partial data", func(t *testing.T) {
peers := createPeers(t, topic, 3)
peers := createPeers(t, topic, 3, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down Expand Up @@ -924,7 +953,7 @@ func TestPartialMessages(t *testing.T) {
}
})
t.Run("three peers. peer 1 has all data and eager pushes. Receivers eager push new content", func(t *testing.T) {
peers := createPeers(t, topic, 3)
peers := createPeers(t, topic, 3, false)
peers.network.addPeers()
defer peers.network.removePeers()

Expand Down
Loading