Skip to content

Commit 2adfade

Browse files
committed
gossipsub-intero(go): update to latest partial message impl
1 parent f9cad33 commit 2adfade

File tree

5 files changed

+47
-81
lines changed

5 files changed

+47
-81
lines changed

gossipsub-interop/go-libp2p/experiment.go

Lines changed: 19 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,8 @@ type HostConnector interface {
2626
}
2727

2828
type incomingPartialRPC struct {
29-
from peer.ID
30-
topic string
31-
groupID []byte
32-
iwant []byte
33-
ihave []byte
34-
partialMessageBytes []byte
29+
pubsub_pb.PartialMessagesExtension
30+
from peer.ID
3531
}
3632

3733
type partialMsgWithTopic struct {
@@ -103,42 +99,35 @@ func (m *partialMsgManager) addMsg(req partialMsgWithTopic) {
10399
}
104100

105101
func (m *partialMsgManager) handleRPC(rpc incomingPartialRPC) {
106-
_, ok := m.partialMessages[rpc.topic]
102+
_, ok := m.partialMessages[rpc.GetTopicID()]
107103
if !ok {
108-
m.partialMessages[rpc.topic] = make(map[string]*PartialMessage)
104+
m.partialMessages[rpc.GetTopicID()] = make(map[string]*PartialMessage)
109105
}
110-
pm, ok := m.partialMessages[rpc.topic][string(rpc.groupID)]
106+
pm, ok := m.partialMessages[rpc.GetTopicID()][string(rpc.GroupID)]
111107
if !ok {
112108
pm = &PartialMessage{}
113-
copy(pm.groupID[:], rpc.groupID)
114-
m.partialMessages[rpc.topic][string(rpc.groupID)] = pm
109+
copy(pm.groupID[:], rpc.GroupID)
110+
m.partialMessages[rpc.GetTopicID()][string(rpc.GroupID)] = pm
115111
}
116112

117113
// Extend first, so we don't request something we just got.
118-
if len(rpc.partialMessageBytes) != 0 {
119-
pm.Extend(rpc.partialMessageBytes)
114+
if len(rpc.PartialMessage) != 0 {
115+
pm.Extend(rpc.PartialMessage)
120116
}
121117

122-
missing, _ := pm.MissingParts()
123-
if len(missing) == 0 {
118+
has := pm.PartsMetadata()
119+
if has[0] == 0xff {
124120
m.Info("All parts received")
125121
}
126122

127123
var shouldRepublish bool
128-
pmHas, _ := pm.AvailableParts()
129-
if len(rpc.iwant) != 0 {
130-
if rpc.iwant[0]&pmHas[0] != 0 {
131-
shouldRepublish = true
132-
}
133-
}
134-
if len(rpc.ihave) != 0 {
135-
if (rpc.ihave[0] & (^pmHas[0])) != 0 {
136-
shouldRepublish = true
137-
}
124+
pmHas := pm.PartsMetadata()
125+
if len(rpc.PartsMetadata) == 1 {
126+
shouldRepublish = pmHas[0] != rpc.PartsMetadata[0]
138127
}
139128

140129
if shouldRepublish {
141-
m.pubsub.PublishPartialMessage(rpc.topic, pm, partialmessages.PublishOptions{})
130+
m.pubsub.PublishPartialMessage(rpc.GetTopicID(), pm, partialmessages.PublishOptions{})
142131
}
143132
}
144133

@@ -195,15 +184,12 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
195184
// Not doing any validation for now
196185
return nil
197186
},
198-
OnIncomingRPC: func(from peer.ID, topic string, groupID, iwant, ihave, partialMessageBytes []byte) {
187+
OnIncomingRPC: func(from peer.ID, rpc *pubsub_pb.PartialMessagesExtension) error {
199188
n.partialMsgMgr.incomingRPC <- incomingPartialRPC{
200-
from: from,
201-
topic: topic,
202-
groupID: groupID,
203-
iwant: iwant,
204-
ihave: ihave,
205-
partialMessageBytes: partialMessageBytes,
189+
from: from,
190+
PartialMessagesExtension: *rpc,
206191
}
192+
return nil
207193
},
208194
}
209195

gossipsub-interop/go-libp2p/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.24.1
55
require (
66
github.com/libp2p/go-libp2p v0.41.1
77
// fork with partial messages
8-
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250925234347-91dd1a5e4596
8+
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250930192224-4bec5400d65c
99
)
1010

1111
require (
@@ -108,5 +108,5 @@ require (
108108
golang.org/x/text v0.22.0 // indirect
109109
golang.org/x/tools v0.30.0 // indirect
110110
google.golang.org/protobuf v1.36.5 // indirect
111-
lukechampine.com/blake3 v1.4.0 // indirect
111+
lukechampine.com/blake3 v1.4.1 // indirect
112112
)

gossipsub-interop/go-libp2p/go.sum

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,8 @@ github.com/libp2p/go-libp2p v0.41.1 h1:8ecNQVT5ev/jqALTvisSJeVNvXYJyK4NhQx1nNRXQ
137137
github.com/libp2p/go-libp2p v0.41.1/go.mod h1:DcGTovJzQl/I7HMrby5ZRjeD0kQkGiy+9w6aEkSZpRI=
138138
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
139139
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
140-
github.com/libp2p/go-libp2p-pubsub v0.13.1 h1:tV3ttzzZSCk0EtEXnxVmWIXgjVxXx+20Jwjbs/Ctzjo=
141-
github.com/libp2p/go-libp2p-pubsub v0.13.1/go.mod h1:MKPU5vMI8RRFyTP0HfdsF9cLmL1nHAeJm44AxJGJx44=
142-
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250924192223-5297a51ffd50 h1:g5aj1tw4GHg3Oee8pv0/zn5nyxegTD1FuDYfrqXVbUc=
143-
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250924192223-5297a51ffd50/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
144-
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250925234347-91dd1a5e4596 h1:qBAupWsLXtQAd6wJp7nmTkLcwnKsscwR7NKxlPeDFtk=
145-
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250925234347-91dd1a5e4596/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
140+
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250930192224-4bec5400d65c h1:L7Ub8HdUIAgQLoBdJZVH5YLxN2UWDwz0+mHtyVcGsk0=
141+
github.com/libp2p/go-libp2p-pubsub v0.14.4-0.20250930192224-4bec5400d65c/go.mod h1:lr4oE8bFgQaifRcoc2uWhWWiK6tPdOEKpUuR408GFN4=
146142
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
147143
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
148144
github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0=
@@ -530,7 +526,7 @@ grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJd
530526
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
531527
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
532528
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
533-
lukechampine.com/blake3 v1.4.0 h1:xDbKOZCVbnZsfzM6mHSYcGRHZ3YrLDzqz8XnV4uaD5w=
534-
lukechampine.com/blake3 v1.4.0/go.mod h1:MQJNQCTnR+kwOP/JEZSxj3MaQjp80FOFSNMMHXcSeX0=
529+
lukechampine.com/blake3 v1.4.1 h1:I3Smz7gso8w4/TunLKec6K2fn+kyKtDxr/xcQEN84Wg=
530+
lukechampine.com/blake3 v1.4.1/go.mod h1:QFosUxmjB8mnrWFSNwKmvxHpfY72bmD2tQ0kBMM3kwo=
535531
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
536532
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=

gossipsub-interop/go-libp2p/partial.go

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"math/bits"
88

99
partialmessages "github.com/libp2p/go-libp2p-pubsub/partialmessages"
10-
"github.com/libp2p/go-libp2p/core/peer"
1110
)
1211

1312
const partLen = 1024
@@ -17,6 +16,17 @@ type PartialMessage struct {
1716
parts [8][]byte // each part is partLen sized or nil if empty
1817
}
1918

19+
// PartsMetadata implements partialmessages.PartialMessage.
20+
func (p *PartialMessage) PartsMetadata() []byte {
21+
out := []byte{0}
22+
for i := range p.parts {
23+
if len(p.parts[i]) > 0 {
24+
out[0] |= 1 << i
25+
}
26+
}
27+
return out
28+
}
29+
2030
// FillParts is used to initialize this PartialMessage for testing by filling in
2131
// the parts it should have. The algorithm is simple:
2232
// - treat the groupID as our starting uint64 number BigEndian
@@ -38,32 +48,11 @@ func (p *PartialMessage) FillParts(bitmap byte) error {
3848
return nil
3949
}
4050

41-
// AvailableParts implements partialmessages.PartialMessage.
42-
func (p *PartialMessage) AvailableParts() ([]byte, error) {
43-
out := []byte{0}
44-
for i := range p.parts {
45-
if len(p.parts[i]) > 0 {
46-
out[0] |= 1 << i
47-
}
48-
}
49-
return out, nil
50-
}
51-
5251
// GroupID implements partialmessages.PartialMessage.
5352
func (p *PartialMessage) GroupID() []byte {
5453
return p.groupID[:]
5554
}
5655

57-
// MissingParts implements partialmessages.PartialMessage.
58-
func (p *PartialMessage) MissingParts() ([]byte, error) {
59-
b, _ := p.AvailableParts()
60-
b[0] = ^b[0]
61-
if b[0] == 0 {
62-
return nil, nil
63-
}
64-
return b, nil
65-
}
66-
6756
func (p *PartialMessage) Extend(data []byte) error {
6857
if len(data) < 1+len(p.groupID) {
6958
return errors.New("invalid data length")
@@ -98,12 +87,8 @@ func (p *PartialMessage) Extend(data []byte) error {
9887
return nil
9988
}
10089

101-
// PartialMessageBytesFromMetadata implements partialmessages.PartialMessage.
102-
func (p *PartialMessage) PartialMessageBytesFromMetadata(metadata []byte) ([]byte, []byte, error) {
103-
if len(metadata) == 0 {
104-
// Treat this as the same as a request for all parts
105-
metadata = []byte{0xff}
106-
}
90+
// PartialMessageBytes implements partialmessages.PartialMessage.
91+
func (p *PartialMessage) PartialMessageBytes(metadata []byte) ([]byte, []byte, error) {
10792
if len(metadata) != 1 {
10893
return nil, nil, errors.New("invalid metadata length")
10994
}
@@ -112,7 +97,8 @@ func (p *PartialMessage) PartialMessageBytesFromMetadata(metadata []byte) ([]byt
11297
out = append(out, 0) // This byte will contain the parts we are including in the message
11398
remaining := []byte{metadata[0]}
11499
for i := range p.parts {
115-
if metadata[0]&(1<<i) == 0 {
100+
if metadata[0]&(1<<i) != 0 {
101+
// They already have this part
116102
continue
117103
}
118104
if len(p.parts[i]) == 0 {
@@ -133,13 +119,4 @@ func (p *PartialMessage) PartialMessageBytesFromMetadata(metadata []byte) ([]byt
133119
return out, remaining, nil
134120
}
135121

136-
// ShouldRequest implements partialmessages.PartialMessage.
137-
func (p *PartialMessage) ShouldRequest(_ peer.ID, peerHasMetadata []byte) bool {
138-
wants, _ := p.MissingParts()
139-
if len(wants) == 0 || len(peerHasMetadata) == 0 {
140-
return false
141-
}
142-
return wants[0]&peerHasMetadata[0] != 0
143-
}
144-
145122
var _ partialmessages.PartialMessage = (*PartialMessage)(nil)

gossipsub-interop/go-libp2p/partial_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99

1010
partialmessages "github.com/libp2p/go-libp2p-pubsub/partialmessages"
11+
"github.com/libp2p/go-libp2p/core/peer"
1112
)
1213

1314
func TestFillParts(t *testing.T) {
@@ -92,6 +93,12 @@ func (p *partialInvariantChecker) SplitIntoParts(in *PartialMessage) ([]*Partial
9293
return out, nil
9394
}
9495

96+
// ShouldRequest implements partialmessages.InvariantChecker.
97+
func (p *partialInvariantChecker) ShouldRequest(a *PartialMessage, from peer.ID, partsMetadata []byte) bool {
98+
aHas := a.PartsMetadata()[0]
99+
return len(partsMetadata) == 1 && aHas != partsMetadata[0]
100+
}
101+
95102
var _ partialmessages.InvariantChecker[*PartialMessage] = (*partialInvariantChecker)(nil)
96103

97104
func TestPartialMessageInvariants(t *testing.T) {

0 commit comments

Comments
 (0)