Skip to content

Commit 2e0b895

Browse files
committed
add partial messages to gossipsub router
1 parent 6599e2a commit 2e0b895

File tree

5 files changed

+463
-3
lines changed

5 files changed

+463
-3
lines changed

extensions.go

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package pubsub
22

33
import (
4+
"errors"
5+
"iter"
6+
7+
"github.com/libp2p/go-libp2p-pubsub/partialmessages"
48
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
59
"github.com/libp2p/go-libp2p/core/peer"
610
)
711

812
type PeerExtensions struct {
9-
TestExtension bool
13+
TestExtension bool
14+
PartialMessages bool
1015
}
1116

1217
type TestExtensionConfig struct {
@@ -37,6 +42,7 @@ func peerExtensionsFromRPC(rpc *RPC) PeerExtensions {
3742
out := PeerExtensions{}
3843
if hasPeerExtensions(rpc) {
3944
out.TestExtension = rpc.Control.Extensions.GetTestExtension()
45+
out.PartialMessages = rpc.Control.Extensions.GetPartialMessages()
4046
}
4147
return out
4248
}
@@ -50,6 +56,14 @@ func (pe *PeerExtensions) ExtendRPC(rpc *RPC) *RPC {
5056
TestExtension: &pe.TestExtension,
5157
}
5258
}
59+
if pe.PartialMessages {
60+
if rpc.Control == nil {
61+
rpc.Control = &pubsub_pb.ControlMessage{}
62+
}
63+
rpc.Control.Extensions = &pubsub_pb.ControlExtensions{
64+
PartialMessages: &pe.PartialMessages,
65+
}
66+
}
5367
return rpc
5468
}
5569

@@ -59,8 +73,9 @@ type extensionsState struct {
5973
sentExtensions map[peer.ID]struct{}
6074
reportMisbehavior func(peer.ID)
6175
sendRPC func(p peer.ID, r *RPC, urgent bool)
76+
testExtension *testExtension
6277

63-
testExtension *testExtension
78+
partialMessagesExtension *partialmessages.PartialMessageExtension
6479
}
6580

6681
func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer.ID), sendRPC func(peer.ID, *RPC, bool)) *extensionsState {
@@ -132,14 +147,73 @@ func (es *extensionsState) extensionsAddPeer(id peer.ID) {
132147
if es.myExtensions.TestExtension && es.peerExtensions[id].TestExtension {
133148
es.testExtension.AddPeer(id)
134149
}
150+
151+
if es.myExtensions.PartialMessages && es.peerExtensions[id].PartialMessages {
152+
es.partialMessagesExtension.AddPeer(id)
153+
}
135154
}
136155

137156
// extensionsRemovePeer is always called after extensionsAddPeer.
138157
func (es *extensionsState) extensionsRemovePeer(id peer.ID) {
158+
if es.myExtensions.PartialMessages && es.peerExtensions[id].PartialMessages {
159+
es.partialMessagesExtension.RemovePeer(id)
160+
}
139161
}
140162

141163
func (es *extensionsState) extensionsHandleRPC(rpc *RPC) {
142164
if es.myExtensions.TestExtension && es.peerExtensions[rpc.from].TestExtension {
143165
es.testExtension.HandleRPC(rpc.from, rpc.TestExtension)
144166
}
167+
168+
if es.myExtensions.PartialMessages && es.peerExtensions[rpc.from].PartialMessages && rpc.Partial != nil {
169+
es.partialMessagesExtension.HandleRPC(rpc.from, rpc.Partial)
170+
}
171+
}
172+
173+
func (es *extensionsState) Heartbeat() {
174+
if es.myExtensions.PartialMessages {
175+
es.partialMessagesExtension.Heartbeat()
176+
}
177+
}
178+
179+
func WithPartialMessagesExtension(pm *partialmessages.PartialMessageExtension) Option {
180+
return func(ps *PubSub) error {
181+
gs, ok := ps.rt.(*GossipSubRouter)
182+
if !ok {
183+
return errors.New("pubsub router is not gossipsub")
184+
}
185+
pm.Init(routerForPartialMessage{gs})
186+
187+
gs.extensions.myExtensions.PartialMessages = true
188+
gs.extensions.partialMessagesExtension = pm
189+
return nil
190+
}
191+
}
192+
193+
type routerForPartialMessage struct {
194+
gs *GossipSubRouter
195+
}
196+
197+
// MeshPeers implements partialmessages.Router.
198+
func (r routerForPartialMessage) MeshPeers(topic string) iter.Seq[peer.ID] {
199+
return func(yield func(peer.ID) bool) {
200+
for peer := range r.gs.mesh[topic] {
201+
if exts := r.gs.extensions.peerExtensions[peer]; exts.PartialMessages {
202+
if !yield(peer) {
203+
return
204+
}
205+
}
206+
}
207+
}
208+
}
209+
210+
// SendRPC implements partialmessages.Router.
211+
func (r routerForPartialMessage) SendRPC(p peer.ID, rpc *pubsub_pb.PartialMessagesExtension, urgent bool) {
212+
r.gs.sendRPC(p, &RPC{
213+
RPC: pubsub_pb.RPC{
214+
Partial: rpc,
215+
},
216+
}, urgent)
145217
}
218+
219+
var _ partialmessages.Router = routerForPartialMessage{}

gossipsub.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1833,6 +1833,8 @@ func (gs *GossipSubRouter) heartbeat() {
18331833

18341834
// advance the message history window
18351835
gs.mcache.Shift()
1836+
1837+
gs.extensions.Heartbeat()
18361838
}
18371839

18381840
func (gs *GossipSubRouter) clearIHaveCounters() {

0 commit comments

Comments
 (0)