Skip to content

Commit d6d84ae

Browse files
committed
gossipsub: implement extensions
1 parent 21c844e commit d6d84ae

File tree

4 files changed

+304
-4
lines changed

4 files changed

+304
-4
lines changed

extensions.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package pubsub
2+
3+
import (
4+
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
5+
"github.com/libp2p/go-libp2p/core/peer"
6+
)
7+
8+
type PeerExtensions struct {
9+
TestExtension bool
10+
}
11+
12+
type TestExtensionConfig struct {
13+
OnReceiveTestExtension func(from peer.ID)
14+
}
15+
16+
func WithTestExtension(c TestExtensionConfig) Option {
17+
return func(ps *PubSub) error {
18+
if rt, ok := ps.rt.(*GossipSubRouter); ok {
19+
rt.extensions.testExtension = &testExtension{
20+
sendRPC: rt.extensions.sendRPC,
21+
onReceiveTestExtension: c.OnReceiveTestExtension,
22+
}
23+
rt.extensions.myExtensions.TestExtension = true
24+
}
25+
return nil
26+
}
27+
}
28+
29+
func hasPeerExtensions(rpc *RPC) bool {
30+
if rpc != nil && rpc.Control != nil && rpc.Control.Extensions != nil {
31+
return true
32+
}
33+
return false
34+
}
35+
36+
func peerExtensionsFromRPC(rpc *RPC) PeerExtensions {
37+
out := PeerExtensions{}
38+
if hasPeerExtensions(rpc) {
39+
out.TestExtension = rpc.Control.Extensions.GetTestExtension()
40+
}
41+
return out
42+
}
43+
44+
func (pe *PeerExtensions) ExtendRPC(rpc *RPC) *RPC {
45+
if pe.TestExtension {
46+
if rpc.Control == nil {
47+
rpc.Control = &pubsub_pb.ControlMessage{}
48+
}
49+
rpc.Control.Extensions = &pubsub_pb.ControlExtensions{
50+
TestExtension: &pe.TestExtension,
51+
}
52+
}
53+
return rpc
54+
}
55+
56+
type extensionsState struct {
57+
myExtensions PeerExtensions
58+
peerExtensions map[peer.ID]PeerExtensions // peer's extensions
59+
sentExtensions map[peer.ID]struct{}
60+
reportMisbehavior func(peer.ID)
61+
sendRPC func(p peer.ID, r *RPC, urgent bool)
62+
63+
testExtension *testExtension
64+
}
65+
66+
func newExtensionsState(myExtensions PeerExtensions, reportMisbehavior func(peer.ID), sendRPC func(peer.ID, *RPC, bool)) *extensionsState {
67+
return &extensionsState{
68+
myExtensions: myExtensions,
69+
peerExtensions: make(map[peer.ID]PeerExtensions),
70+
sentExtensions: make(map[peer.ID]struct{}),
71+
reportMisbehavior: reportMisbehavior,
72+
sendRPC: sendRPC,
73+
testExtension: nil,
74+
}
75+
}
76+
77+
func (es *extensionsState) HandleRPC(rpc *RPC) {
78+
if _, ok := es.peerExtensions[rpc.from]; !ok {
79+
// We know this is the first message because we didn't have extensions
80+
// for this peer, and we always set extensions on the first rpc.
81+
es.peerExtensions[rpc.from] = peerExtensionsFromRPC(rpc)
82+
if _, ok := es.sentExtensions[rpc.from]; ok {
83+
// We just finished both sending and receiving the extensions
84+
// control message.
85+
es.extensionsAddPeer(rpc.from)
86+
}
87+
} else {
88+
// We already have an extension for this peer. If they send us another
89+
// extensions control message, that is a protocol error. We should
90+
// down score them because they are misbehaving.
91+
if hasPeerExtensions(rpc) {
92+
es.reportMisbehavior(rpc.from)
93+
}
94+
}
95+
96+
es.extensionsHandleRPC(rpc)
97+
}
98+
99+
func (es *extensionsState) AddPeer(id peer.ID, helloPacket *RPC) *RPC {
100+
// Send our extensions as the first message.
101+
helloPacket = es.myExtensions.ExtendRPC(helloPacket)
102+
103+
es.sentExtensions[id] = struct{}{}
104+
if _, ok := es.peerExtensions[id]; ok {
105+
// We've just finished sending and receiving the extensions control
106+
// message.
107+
es.extensionsAddPeer(id)
108+
}
109+
return helloPacket
110+
}
111+
112+
func (es *extensionsState) RemovePeer(id peer.ID) {
113+
_, recvdExt := es.peerExtensions[id]
114+
_, sentExt := es.sentExtensions[id]
115+
if recvdExt && sentExt {
116+
// Add peer was previously called, so we need to call remove peer
117+
es.extensionsRemovePeer(id)
118+
}
119+
delete(es.peerExtensions, id)
120+
if len(es.peerExtensions) == 0 {
121+
es.peerExtensions = make(map[peer.ID]PeerExtensions)
122+
}
123+
delete(es.sentExtensions, id)
124+
if len(es.sentExtensions) == 0 {
125+
es.sentExtensions = make(map[peer.ID]struct{})
126+
}
127+
}
128+
129+
// extensionsAddPeer is only called once we've both sent and received the
130+
// extensions control message.
131+
func (es *extensionsState) extensionsAddPeer(id peer.ID) {
132+
if es.myExtensions.TestExtension && es.peerExtensions[id].TestExtension {
133+
es.testExtension.AddPeer(id)
134+
}
135+
}
136+
137+
// extensionsRemovePeer is always called after extensionsAddPeer.
138+
func (es *extensionsState) extensionsRemovePeer(id peer.ID) {
139+
}
140+
141+
func (es *extensionsState) extensionsHandleRPC(rpc *RPC) {
142+
if es.myExtensions.TestExtension && es.peerExtensions[rpc.from].TestExtension {
143+
es.testExtension.HandleRPC(rpc.from, rpc.TestExtension)
144+
}
145+
}

gossipsub.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, o
260260
// DefaultGossipSubRouter returns a new GossipSubRouter with default parameters.
261261
func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
262262
params := DefaultGossipSubParams()
263-
return &GossipSubRouter{
263+
rt := &GossipSubRouter{
264264
peers: make(map[peer.ID]protocol.ID),
265265
mesh: make(map[string]map[peer.ID]struct{}),
266266
fanout: make(map[string]map[peer.ID]struct{}),
@@ -281,6 +281,14 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
281281
tagTracer: newTagTracer(h.ConnManager()),
282282
params: params,
283283
}
284+
285+
rt.extensions = newExtensionsState(PeerExtensions{}, func(p peer.ID) {
286+
if rt.score != nil {
287+
rt.score.AddPenalty(p, 10)
288+
}
289+
}, rt.sendRPC)
290+
291+
return rt
284292
}
285293

286294
// DefaultGossipSubParams returns the default gossip sub parameters
@@ -466,8 +474,10 @@ func WithGossipSubParams(cfg GossipSubParams) Option {
466474
// is the fanout map. Fanout peer lists are expired if we don't publish any
467475
// messages to their topic for GossipSubFanoutTTL.
468476
type GossipSubRouter struct {
469-
p *PubSub
470-
peers map[peer.ID]protocol.ID // peer protocols
477+
p *PubSub
478+
peers map[peer.ID]protocol.ID // peer protocols
479+
extensions *extensionsState
480+
471481
direct map[peer.ID]struct{} // direct peers
472482
mesh map[string]map[peer.ID]struct{} // topic meshes
473483
fanout map[string]map[peer.ID]struct{} // topic fanout
@@ -652,13 +662,18 @@ loop:
652662
}
653663
}
654664
gs.outbound[p] = outbound
655-
665+
if gs.feature(GossipSubFeatureExtensions, proto) {
666+
helloPacket = gs.extensions.AddPeer(p, helloPacket)
667+
}
656668
return helloPacket
657669
}
658670

659671
func (gs *GossipSubRouter) RemovePeer(p peer.ID) {
660672
log.Debugf("PEERDOWN: Remove disconnected peer %s", p)
661673
gs.tracer.RemovePeer(p)
674+
if gs.feature(GossipSubFeatureExtensions, gs.peers[p]) {
675+
gs.extensions.RemovePeer(p)
676+
}
662677
delete(gs.peers, p)
663678
for _, peers := range gs.mesh {
664679
delete(peers, p)
@@ -748,6 +763,8 @@ func (gs *GossipSubRouter) Preprocess(from peer.ID, msgs []*Message) {
748763
}
749764

750765
func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
766+
gs.extensions.HandleRPC(rpc)
767+
751768
ctl := rpc.GetControl()
752769
if ctl == nil {
753770
return

gossipsub_test.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3961,3 +3961,116 @@ func newSkeletonGossipsub(ctx context.Context, h host.Host) *skeletonGossipsub {
39613961
inRPC: sendRPC,
39623962
}
39633963
}
3964+
3965+
func TestExtensionsControlMessage(t *testing.T) {
3966+
for _, wellBehaved := range []bool{true, false} {
3967+
t.Run(fmt.Sprintf("wellBehaved=%t", wellBehaved), func(t *testing.T) {
3968+
3969+
ctx, cancel := context.WithCancel(context.Background())
3970+
defer cancel()
3971+
hosts := getDefaultHosts(t, 2)
3972+
psub0 := getGossipsub(ctx, hosts[0],
3973+
WithPeerScore(
3974+
&PeerScoreParams{
3975+
AppSpecificScore: func(peer.ID) float64 { return 0 },
3976+
BehaviourPenaltyWeight: -1,
3977+
BehaviourPenaltyDecay: ScoreParameterDecay(time.Minute),
3978+
DecayInterval: DefaultDecayInterval,
3979+
DecayToZero: DefaultDecayToZero,
3980+
},
3981+
&PeerScoreThresholds{
3982+
GossipThreshold: -100,
3983+
PublishThreshold: -500,
3984+
GraylistThreshold: -1000,
3985+
}),
3986+
WithMessageIdFn(func(msg *pb.Message) string {
3987+
return string(msg.Data)
3988+
}))
3989+
3990+
psub1 := newSkeletonGossipsub(ctx, hosts[1])
3991+
3992+
connect(t, hosts[0], hosts[1])
3993+
time.Sleep(time.Second)
3994+
3995+
loopTimes := 3
3996+
3997+
for i := range loopTimes {
3998+
rpcToSend := &pb.RPC{
3999+
Control: &pb.ControlMessage{
4000+
Extensions: &pb.ControlExtensions{},
4001+
},
4002+
}
4003+
if wellBehaved && i > 0 {
4004+
// A well behaved node does not repeat the control
4005+
// extension message
4006+
rpcToSend.Control.Extensions = nil
4007+
}
4008+
psub1.inRPC <- rpcToSend
4009+
}
4010+
4011+
time.Sleep(time.Second)
4012+
4013+
peerScore := psub0.rt.(*GossipSubRouter).score.Score(hosts[1].ID())
4014+
t.Log("Peer score:", peerScore)
4015+
if wellBehaved {
4016+
if peerScore < 0 {
4017+
t.Fatal("Peer score should not be negative")
4018+
}
4019+
} else {
4020+
if peerScore >= 0 {
4021+
t.Fatal("Peer score should not be positive")
4022+
}
4023+
}
4024+
})
4025+
}
4026+
}
4027+
4028+
func TestTestExtension(t *testing.T) {
4029+
hosts := getDefaultHosts(t, 2)
4030+
var receivedTestExtension atomic.Bool
4031+
c := TestExtensionConfig{
4032+
OnReceiveTestExtension: func(_ peer.ID) {
4033+
receivedTestExtension.Store(true)
4034+
},
4035+
}
4036+
psub := getGossipsub(context.Background(), hosts[0], WithTestExtension(c))
4037+
_ = psub
4038+
4039+
ctx, cancel := context.WithCancel(context.Background())
4040+
defer cancel()
4041+
psub1 := newSkeletonGossipsub(ctx, hosts[1])
4042+
4043+
connect(t, hosts[0], hosts[1])
4044+
4045+
const timeout = 3 * time.Second
4046+
select {
4047+
case <-time.After(timeout):
4048+
t.Fatal("Timeout")
4049+
case r := <-psub1.outRPC:
4050+
if !*r.Control.Extensions.TestExtension {
4051+
t.Fatal("Unexpected RPC. First RPC should be the Extensions Control Message")
4052+
}
4053+
}
4054+
4055+
truePtr := true
4056+
psub1.inRPC <- &pb.RPC{
4057+
Control: &pb.ControlMessage{
4058+
Extensions: &pb.ControlExtensions{
4059+
TestExtension: &truePtr,
4060+
},
4061+
},
4062+
}
4063+
4064+
select {
4065+
case <-time.After(timeout):
4066+
t.Fatal("Timeout")
4067+
case r := <-psub1.outRPC:
4068+
if r.TestExtension == nil {
4069+
t.Fatal("Unexpected RPC. Next RPC should be the TestExtension Message")
4070+
}
4071+
}
4072+
4073+
if !receivedTestExtension.Load() {
4074+
t.Fatal("TestExtension not received")
4075+
}
4076+
}

testextension.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package pubsub
2+
3+
import (
4+
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
5+
"github.com/libp2p/go-libp2p/core/peer"
6+
)
7+
8+
type testExtension struct {
9+
sendRPC func(p peer.ID, r *RPC, urgent bool)
10+
onReceiveTestExtension func(peer.ID)
11+
}
12+
13+
func (e *testExtension) AddPeer(id peer.ID) {
14+
e.sendRPC(id, &RPC{
15+
RPC: pubsub_pb.RPC{
16+
TestExtension: &pubsub_pb.TestExtension{},
17+
},
18+
}, false)
19+
}
20+
21+
func (e *testExtension) HandleRPC(from peer.ID, _ *pubsub_pb.TestExtension) {
22+
if e.onReceiveTestExtension != nil {
23+
e.onReceiveTestExtension(from)
24+
}
25+
}

0 commit comments

Comments
 (0)