3232
3333import java .util .ArrayList ;
3434import java .util .List ;
35+ import java .util .concurrent .locks .Lock ;
36+ import java .util .concurrent .locks .ReentrantLock ;
3537import java .util .stream .Collectors ;
3638
3739import static org .neo4j .gds .utils .StringFormatting .formatWithLocale ;
@@ -42,6 +44,7 @@ public final class RelationshipIds extends CypherGraphStore.StateVisitor.Adapter
4244 private final TokenHolders tokenHolders ;
4345 private final List <RelationshipIdContext > relationshipIdContexts ;
4446 private final List <UpdateListener > updateListeners ;
47+ private final Lock lock ;
4548
4649 public interface UpdateListener {
4750 void onRelationshipIdsAdded (RelationshipIdContext relationshipIdContext );
@@ -62,6 +65,7 @@ private RelationshipIds(GraphStore graphStore, TokenHolders tokenHolders, List<R
6265 this .tokenHolders = tokenHolders ;
6366 this .relationshipIdContexts = relationshipIdContexts ;
6467 this .updateListeners = new ArrayList <>();
68+ this .lock = new ReentrantLock ();
6569 }
6670
6771 public <T > T resolveRelationshipId (long relationshipId , ResolvedRelationshipIdFunction <T > relationshipIdConsumer ) {
@@ -93,20 +97,41 @@ public <T> T resolveRelationshipId(long relationshipId, ResolvedRelationshipIdFu
9397 }
9498
9599 public void registerUpdateListener (UpdateListener updateListener ) {
96- this .updateListeners .add (updateListener );
100+ try {
101+ lock .lock ();
102+ this .updateListeners .add (updateListener );
103+ } finally {
104+ lock .unlock ();
105+ }
97106 // replay added relationship id contexts
98107 relationshipIdContexts .forEach (updateListener ::onRelationshipIdsAdded );
99108 }
100109
101110 public void removeUpdateListener (UpdateListener updateListener ) {
102- this .updateListeners .remove (updateListener );
111+ // This is potentially called in parallel by the Cypher runtime
112+ // and while we are adding new relationship types.
113+ try {
114+ lock .lock ();
115+ this .updateListeners .remove (updateListener );
116+ } finally {
117+ lock .unlock ();
118+ }
103119 }
104120
105121 @ Override
106122 public void relationshipTypeAdded (String relationshipType ) {
107- var relationshipIdContext = relationshipIdContextFromRelType (graphStore , tokenHolders , RelationshipType .of (relationshipType ));
123+ var relationshipIdContext = relationshipIdContextFromRelType (
124+ graphStore ,
125+ tokenHolders ,
126+ RelationshipType .of (relationshipType )
127+ );
108128 relationshipIdContexts .add (relationshipIdContext );
109- updateListeners .forEach (updateListener -> updateListener .onRelationshipIdsAdded (relationshipIdContext ));
129+ try {
130+ lock .lock ();
131+ updateListeners .forEach (updateListener -> updateListener .onRelationshipIdsAdded (relationshipIdContext ));
132+ } finally {
133+ lock .unlock ();
134+ }
110135 }
111136
112137 @ NotNull
0 commit comments