@@ -26,12 +26,8 @@ type HostConnector interface {
2626}
2727
2828type incomingPartialRPC struct {
29- from peer.ID
30- topic string
31- groupID []byte
32- iwant []byte
33- ihave []byte
34- partialMessageBytes []byte
29+ pubsub_pb.PartialMessagesExtension
30+ from peer.ID
3531}
3632
3733type partialMsgWithTopic struct {
@@ -103,42 +99,49 @@ func (m *partialMsgManager) addMsg(req partialMsgWithTopic) {
10399}
104100
105101func (m * partialMsgManager ) handleRPC (rpc incomingPartialRPC ) {
106- _ , ok := m .partialMessages [rpc .topic ]
102+ _ , ok := m .partialMessages [rpc .GetTopicID () ]
107103 if ! ok {
108- m .partialMessages [rpc .topic ] = make (map [string ]* PartialMessage )
104+ m .partialMessages [rpc .GetTopicID () ] = make (map [string ]* PartialMessage )
109105 }
110- pm , ok := m .partialMessages [rpc .topic ][string (rpc .groupID )]
106+ pm , ok := m .partialMessages [rpc .GetTopicID () ][string (rpc .GroupID )]
111107 if ! ok {
112108 pm = & PartialMessage {}
113- copy (pm .groupID [:], rpc .groupID )
114- m .partialMessages [rpc .topic ][string (rpc .groupID )] = pm
109+ copy (pm .groupID [:], rpc .GroupID )
110+ m .partialMessages [rpc .GetTopicID () ][string (rpc .GroupID )] = pm
115111 }
116112
117113 // Extend first, so we don't request something we just got.
118- if len (rpc .partialMessageBytes ) != 0 {
119- pm .Extend (rpc .partialMessageBytes )
114+ beforeExtend := pm .PartsMetadata ()[0 ]
115+ if len (rpc .PartialMessage ) != 0 {
116+ err := pm .Extend (rpc .PartialMessage )
117+ if err != nil {
118+ m .Error ("Failed to extend partial message" , "err" , err )
119+ return
120+ }
120121 }
122+ afterExtend := pm .PartsMetadata ()[0 ]
123+ var shouldRepublish bool
124+ if beforeExtend != afterExtend {
125+ m .Info ("Extended partial message" )
126+ shouldRepublish = true
121127
122- missing , _ := pm .MissingParts ()
123- if len (missing ) == 0 {
124- m .Info ("All parts received" )
125128 }
126129
127- var shouldRepublish bool
128- pmHas , _ := pm .AvailableParts ()
129- if len (rpc .iwant ) != 0 {
130- if rpc .iwant [0 ]& pmHas [0 ] != 0 {
131- shouldRepublish = true
132- }
130+ has := pm .PartsMetadata ()
131+ if has [0 ] == 0xff {
132+ m .Info ("All parts received" )
133133 }
134- if len (rpc .ihave ) != 0 {
135- if (rpc .ihave [0 ] & (^ pmHas [0 ])) != 0 {
136- shouldRepublish = true
134+
135+ pmHas := pm .PartsMetadata ()
136+ if ! shouldRepublish && len (rpc .PartsMetadata ) == 1 {
137+ shouldRepublish = pmHas [0 ] != rpc .PartsMetadata [0 ]
138+ if shouldRepublish {
139+ m .Info ("Republishing partial message because a peer has something I want" )
137140 }
138141 }
139142
140143 if shouldRepublish {
141- m .pubsub .PublishPartialMessage (rpc .topic , pm , partialmessages.PublishOptions {})
144+ m .pubsub .PublishPartialMessage (rpc .GetTopicID () , pm , partialmessages.PublishOptions {})
142145 }
143146}
144147
@@ -195,15 +198,12 @@ func (n *scriptedNode) runInstruction(ctx context.Context, instruction ScriptIns
195198 // Not doing any validation for now
196199 return nil
197200 },
198- OnIncomingRPC : func (from peer.ID , topic string , groupID , iwant , ihave , partialMessageBytes [] byte ) {
201+ OnIncomingRPC : func (from peer.ID , rpc * pubsub_pb. PartialMessagesExtension ) error {
199202 n .partialMsgMgr .incomingRPC <- incomingPartialRPC {
200- from : from ,
201- topic : topic ,
202- groupID : groupID ,
203- iwant : iwant ,
204- ihave : ihave ,
205- partialMessageBytes : partialMessageBytes ,
203+ from : from ,
204+ PartialMessagesExtension : * rpc ,
206205 }
206+ return nil
207207 },
208208 }
209209
0 commit comments