Skip to content

Commit 7dc23ae

Browse files
committed
Add support for supportsPartial field in SubOpts
This lets us distinguish who can support a partial message request versus who requests partial message on a per topic basis.
1 parent 14b2b23 commit 7dc23ae

File tree

7 files changed

+68
-24
lines changed

7 files changed

+68
-24
lines changed

comm.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ func (p *PubSub) getHelloPacket() *RPC {
3737
requestPartial = ts.requestPartialMessages
3838
}
3939
as := &pb.RPC_SubOpts{
40-
Topicid: proto.String(t),
41-
Subscribe: proto.Bool(true),
42-
Partial: &requestPartial,
40+
Topicid: proto.String(t),
41+
Subscribe: proto.Bool(true),
42+
RequestsPartial: &requestPartial,
4343
}
4444
rpc.Subscriptions = append(rpc.Subscriptions, as)
4545
}

extensions.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,11 @@ type partialMessageRouter struct {
199199
gs *GossipSubRouter
200200
}
201201

202+
// PeerRequestsPartial implements partialmessages.Router.
203+
func (r partialMessageRouter) PeerRequestsPartial(peer peer.ID, topic string) bool {
204+
return r.gs.peerRequestsPartial(peer, topic)
205+
}
206+
202207
// MeshPeers implements partialmessages.Router.
203208
func (r partialMessageRouter) MeshPeers(topic string) iter.Seq[peer.ID] {
204209
return func(yield func(peer.ID) bool) {
@@ -212,9 +217,9 @@ func (r partialMessageRouter) MeshPeers(topic string) iter.Seq[peer.ID] {
212217
}
213218

214219
for peer := range peerSet {
215-
if exts := r.gs.extensions.peerExtensions[peer]; exts.PartialMessages {
216-
if peerStates, ok := r.gs.p.topics[topic]; ok && peerStates[peer].requestsPartial {
217-
// Check that the peer wanted partial messages
220+
if r.gs.extensions.peerExtensions[peer].PartialMessages {
221+
if r.gs.peerSupportsPartial(peer, topic) {
222+
// Peer supports partial messages
218223
if !yield(peer) {
219224
return
220225
}

gossipsub.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) {
864864
// We don't send IDONTWANT to the peer that sent us the messages
865865
continue
866866
}
867-
if gs.peerWantsPartial(p, topic) {
867+
if gs.peerRequestsPartial(p, topic) {
868868
// Don't send IDONTWANT to peers that are using partial messages
869869
// for this topic
870870
continue
@@ -1381,7 +1381,7 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
13811381
if pid == from || pid == peer.ID(msg.GetFrom()) {
13821382
continue
13831383
}
1384-
if gs.peerWantsPartial(pid, topic) {
1384+
if gs.peerRequestsPartial(pid, topic) {
13851385
// The peer requested partial messages. We'll skip sending them full messages
13861386
continue
13871387
}
@@ -1393,7 +1393,12 @@ func (gs *GossipSubRouter) rpcs(msg *Message) iter.Seq2[peer.ID, *RPC] {
13931393
}
13941394
}
13951395

1396-
func (gs *GossipSubRouter) peerWantsPartial(p peer.ID, topic string) bool {
1396+
func (gs *GossipSubRouter) peerSupportsPartial(p peer.ID, topic string) bool {
1397+
peerStates, ok := gs.p.topics[topic]
1398+
return ok && gs.extensions.myExtensions.PartialMessages && peerStates[p].requestsPartial
1399+
}
1400+
1401+
func (gs *GossipSubRouter) peerRequestsPartial(p peer.ID, topic string) bool {
13971402
peerStates, ok := gs.p.topics[topic]
13981403
return ok && gs.extensions.myExtensions.PartialMessages && peerStates[p].requestsPartial
13991404
}

partialmessages/partialmsgs.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ type PublishOptions struct {
133133
type Router interface {
134134
SendRPC(p peer.ID, r *pb.PartialMessagesExtension, urgent bool)
135135
MeshPeers(topic string) iter.Seq[peer.ID]
136+
PeerRequestsPartial(peer peer.ID, topic string) bool
136137
}
137138

138139
func (e *PartialMessageExtension) groupState(topic string, groupID []byte, peerInitiated bool) (*partialMessageStatePerTopicGroup, error) {
@@ -200,6 +201,7 @@ func (e *PartialMessageExtension) PublishPartial(topic string, partial Message,
200201
}
201202
for p := range peers {
202203
log := e.Logger.With("peer", p)
204+
requestedPartial := e.router.PeerRequestsPartial(p, topic)
203205

204206
var rpc pb.PartialMessagesExtension
205207
var sendRPC bool
@@ -212,7 +214,7 @@ func (e *PartialMessageExtension) PublishPartial(topic string, partial Message,
212214
}
213215

214216
// Try to fulfill any wants from the peer
215-
if pState.partsMetadata != nil {
217+
if requestedPartial && pState.partsMetadata != nil {
216218
// This peer has previously asked for a certain part. We'll give
217219
// them what we can.
218220
pm, err := partial.PartialMessageBytes(pState.partsMetadata)
@@ -234,7 +236,7 @@ func (e *PartialMessageExtension) PublishPartial(topic string, partial Message,
234236
// Only send the eager push to the peer if:
235237
// - we didn't reply to an explicit request
236238
// - we have something to eager push
237-
if !inResponseToIWant && len(opts.EagerPush) > 0 {
239+
if requestedPartial && !inResponseToIWant && len(opts.EagerPush) > 0 {
238240
log.Debug("Eager pushing")
239241
sendRPC = true
240242
rpc.PartialMessage = opts.EagerPush

partialmessages/partialmsgs_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ type testRouter struct {
2424
meshPeers func(topic string) iter.Seq[peer.ID]
2525
}
2626

27+
// PeerRequestsPartial implements Router.
28+
func (r *testRouter) PeerRequestsPartial(peer peer.ID, topic string) bool {
29+
return true
30+
}
31+
2732
func (r *testRouter) SendRPC(p peer.ID, rpc *pubsub_pb.PartialMessagesExtension, urgent bool) {
2833
r.sendRPC(p, rpc, urgent)
2934
}

pubsub.go

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type ProtocolMatchFn = func(protocol.ID) func(protocol.ID) bool
4747

4848
type peerTopicState struct {
4949
requestsPartial bool
50+
supportsPartial bool
5051
}
5152

5253
type peerOutgoingStream struct {
@@ -1197,15 +1198,18 @@ func (p *PubSub) handleRemoveRelay(topic string) {
11971198
// Only called from processLoop.
11981199
func (p *PubSub) announce(topic string, sub bool) {
11991200
var requestPartialMessages bool
1201+
var supportsPartialMessages bool
12001202
if sub {
12011203
if t, ok := p.myTopics[topic]; ok {
12021204
requestPartialMessages = t.requestPartialMessages
1205+
supportsPartialMessages = t.supportsPartialMessages
12031206
}
12041207
}
12051208
subopt := &pb.RPC_SubOpts{
1206-
Topicid: &topic,
1207-
Subscribe: &sub,
1208-
Partial: &requestPartialMessages,
1209+
Topicid: &topic,
1210+
Subscribe: &sub,
1211+
RequestsPartial: &requestPartialMessages,
1212+
SupportsPartial: &supportsPartialMessages,
12091213
}
12101214

12111215
out := rpcWithSubs(subopt)
@@ -1248,15 +1252,18 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
12481252
}
12491253

12501254
var requestPartialMessages bool
1255+
var supportsPartialMessages bool
12511256
if sub {
12521257
if t, ok := p.myTopics[topic]; ok {
12531258
requestPartialMessages = t.requestPartialMessages
1259+
supportsPartialMessages = t.supportsPartialMessages
12541260
}
12551261
}
12561262
subopt := &pb.RPC_SubOpts{
1257-
Topicid: &topic,
1258-
Subscribe: &sub,
1259-
Partial: &requestPartialMessages,
1263+
Topicid: &topic,
1264+
Subscribe: &sub,
1265+
RequestsPartial: &requestPartialMessages,
1266+
SupportsPartial: &supportsPartialMessages,
12601267
}
12611268

12621269
out := rpcWithSubs(subopt)
@@ -1360,16 +1367,19 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
13601367
p.topics[t] = tmap
13611368
}
13621369

1363-
if _, ok = tmap[rpc.from]; !ok {
1364-
tmap[rpc.from] = peerTopicState{requestsPartial: subopt.GetPartial()}
1370+
pts := peerTopicState{
1371+
requestsPartial: subopt.GetRequestsPartial(),
1372+
// If they peer requested partial, they support it by default
1373+
supportsPartial: subopt.GetRequestsPartial() || subopt.GetSupportsPartial(),
1374+
}
1375+
_, seenBefore := tmap[rpc.from]
1376+
tmap[rpc.from] = pts
1377+
if !seenBefore {
1378+
tmap[rpc.from] = pts
13651379
if topic, ok := p.myTopics[t]; ok {
13661380
peer := rpc.from
13671381
topic.sendNotification(PeerEvent{PeerJoin, peer})
13681382
}
1369-
} else {
1370-
s := tmap[rpc.from]
1371-
s.requestsPartial = subopt.GetPartial()
1372-
tmap[rpc.from] = s
13731383
}
13741384
} else {
13751385
tmap, ok := p.topics[t]
@@ -1565,6 +1575,22 @@ func RequestPartialMessages() TopicOpt {
15651575
return errors.New("partial messages are not enabled")
15661576
}
15671577
t.requestPartialMessages = true
1578+
t.supportsPartialMessages = true
1579+
return nil
1580+
}
1581+
}
1582+
1583+
func SupportsPartialMessages() TopicOpt {
1584+
return func(t *Topic) error {
1585+
gs, ok := t.p.rt.(*GossipSubRouter)
1586+
if !ok {
1587+
return errors.New("partial messages only supported by gossipsub")
1588+
}
1589+
1590+
if !gs.extensions.myExtensions.PartialMessages {
1591+
return errors.New("partial messages are not enabled")
1592+
}
1593+
t.supportsPartialMessages = true
15681594
return nil
15691595
}
15701596
}

topic.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ type Topic struct {
3333
mux sync.RWMutex
3434
closed bool
3535

36-
requestPartialMessages bool
36+
requestPartialMessages bool
37+
supportsPartialMessages bool
3738
}
3839

3940
// String returns the topic associated with t

0 commit comments

Comments
 (0)