Skip to content

Commit 02b5add

Browse files
committed
bulk operations
1 parent 4914e6f commit 02b5add

File tree

14 files changed

+759
-31
lines changed

14 files changed

+759
-31
lines changed

Directory.Build.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
<RepositoryUrl>https://github.com/managedcode/graphrag</RepositoryUrl>
2626
<PackageProjectUrl>https://github.com/managedcode/graphrag</PackageProjectUrl>
2727
<Product>Managed Code GraphRag</Product>
28-
<Version>10.0.1</Version>
29-
<PackageVersion>10.0.1</PackageVersion>
28+
<Version>10.0.2</Version>
29+
<PackageVersion>10.0.2</PackageVersion>
3030

3131
</PropertyGroup>
3232
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">

README.md

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,41 @@ GraphRAG ships with a first-class Apache AGE adapter (`ManagedCode.GraphRag.Post
236236
}
237237
}
238238
```
239-
4. **Register through DI.** `services.AddPostgresGraphStore("postgres", configure: ...)` wires up `IAgeConnectionManager`, `IAgeClientFactory`, and `IGraphStore` automatically. Pool sizing behaves like EF Core—tune it via connection-string keywords such as `Maximum Pool Size` when needed, otherwise the standard Npgsql defaults apply. The first Postgres store you register becomes the default `IGraphStore`, and additional stores stay keyed-only:
239+
4. **Register through DI.** `services.AddPostgresGraphStore("postgres", configure: ...)` wires up `IAgeConnectionManager`, `IAgeClientFactory`, `IGraphStore`, `IScopedGraphStore`, and `IBulkGraphStore`. Pool sizing follows the standard Npgsql settings (configure `Max Pool Size`, `Timeout`, etc. inside the connection string). The first registration becomes the default unkeyed `IGraphStore`; additional stores remain keyed-only.
240+
240241
```csharp
241-
await using var client = ageClientFactory.CreateClient();
242-
await client.OpenConnectionAsync(cancellationToken);
242+
var services = new ServiceCollection()
243+
.AddLogging()
244+
.AddPostgresGraphStore("postgres", options =>
245+
{
246+
options.ConnectionString = postgresConnectionString;
247+
options.GraphName = "graphrag";
248+
});
249+
250+
await using var provider = services.BuildServiceProvider();
251+
252+
// Regular graph operations
253+
var graphStore = provider.GetRequiredService<IGraphStore>();
254+
255+
// Scoped operations reuse a single AGE/Postgres connection for the lifetime of the scope
256+
var scopedStore = provider.GetRequiredService<IScopedGraphStore>();
257+
await using (await scopedStore.CreateScopeAsync())
258+
{
259+
await graphStore.UpsertNodeAsync("node-1", "Example", new Dictionary<string, object?> { ["name"] = "Scoped" });
260+
await graphStore.UpsertNodeAsync("node-2", "Example", new Dictionary<string, object?> { ["name"] = "Connection" });
261+
}
262+
263+
// Bulk helpers batch large workloads while keeping the scoped connection alive
264+
var bulkStore = provider.GetRequiredService<IBulkGraphStore>();
265+
await bulkStore.UpsertNodesAsync(new[]
266+
{
267+
new GraphNodeUpsert("bulk-1", "Example", new Dictionary<string, object?> { ["name"] = "Bulk" }),
268+
new GraphNodeUpsert("bulk-2", "Example", new Dictionary<string, object?> { ["name"] = "Write" })
269+
});
243270
```
244271

272+
The `AgeConnectionManager` automatically retries transient `53300: too many clients` errors (up to three exponential backoff attempts) so scopes can wait for a free slot before failing. When a scope is disposed, the underlying `IAgeClientScope` returns its connection to the pool, keeping concurrency predictable even under heavy fan-out.
273+
245274
### Neo4j Setup
246275

247276
Neo4j support lives in `ManagedCode.GraphRag.Neo4j` and uses the official Bolt driver:

src/ManagedCode.GraphRag.Postgres/ApacheAge/AGEClientEventId.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ internal static class AgeClientEventId
55
#region Connection
66
public const int CONNECTION_OPENED = 1000;
77
public const int CONNECTION_CLOSED = 1001;
8+
public const int CONNECTION_RETRYING = 1002;
89

910
public const int NULL_CONNECTION_WARNING = 1800;
1011

Lines changed: 133 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,152 @@
11
using Microsoft.Extensions.DependencyInjection;
22
using Microsoft.Extensions.Logging;
3+
using Npgsql;
34

45
namespace GraphRag.Storage.Postgres.ApacheAge;
56

67
public interface IAgeClientFactory
78
{
8-
AgeClient CreateClient();
9+
IAgeClient CreateClient();
10+
ValueTask<IAgeClientScope> CreateScopeAsync(CancellationToken cancellationToken = default);
911
}
1012

1113
internal sealed class AgeClientFactory([FromKeyedServices] IAgeConnectionManager connectionManager, ILoggerFactory loggerFactory) : IAgeClientFactory
1214
{
1315
private readonly IAgeConnectionManager _connectionManager = connectionManager ?? throw new ArgumentNullException(nameof(connectionManager));
1416
private readonly ILoggerFactory _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
17+
private readonly AsyncLocal<AgeClientScopeState?> _currentScope = new();
1518

16-
public AgeClient CreateClient()
19+
public IAgeClient CreateClient()
20+
{
21+
if (_currentScope.Value is { } scopeState)
22+
{
23+
return scopeState.CreateLease();
24+
}
25+
26+
return CreatePhysicalClient();
27+
}
28+
29+
public ValueTask<IAgeClientScope> CreateScopeAsync(CancellationToken cancellationToken = default)
30+
{
31+
cancellationToken.ThrowIfCancellationRequested();
32+
33+
if (_currentScope.Value is not null)
34+
{
35+
throw new InvalidOperationException("An AGE client scope is already active for this asynchronous context.");
36+
}
37+
38+
var scopeClient = CreatePhysicalClient();
39+
var scopeState = new AgeClientScopeState(scopeClient);
40+
_currentScope.Value = scopeState;
41+
42+
return new ValueTask<IAgeClientScope>(new ActiveAgeClientScope(this, scopeState));
43+
}
44+
45+
private AgeClient CreatePhysicalClient()
1746
{
1847
var logger = _loggerFactory.CreateLogger<AgeClient>();
1948
return new AgeClient(_connectionManager, logger);
2049
}
50+
51+
private void ClearScope(AgeClientScopeState state)
52+
{
53+
if (!ReferenceEquals(_currentScope.Value, state))
54+
{
55+
return;
56+
}
57+
58+
_currentScope.Value = null;
59+
}
60+
61+
private sealed class ActiveAgeClientScope(AgeClientFactory factory, AgeClientFactory.AgeClientScopeState state) : IAgeClientScope
62+
{
63+
private readonly AgeClientFactory _factory = factory;
64+
private readonly AgeClientScopeState _state = state;
65+
private bool _disposed;
66+
67+
public async ValueTask DisposeAsync()
68+
{
69+
if (_disposed)
70+
{
71+
return;
72+
}
73+
74+
if (_state.HasActiveLease)
75+
{
76+
throw new InvalidOperationException("Cannot dispose an AGE client scope while an operation is still running.");
77+
}
78+
79+
_disposed = true;
80+
_factory.ClearScope(_state);
81+
await _state.Client.CloseConnectionAsync().ConfigureAwait(false);
82+
await _state.Client.DisposeAsync().ConfigureAwait(false);
83+
}
84+
}
85+
86+
private sealed class AgeClientScopeState(AgeClient client)
87+
{
88+
private int _leaseActive;
89+
90+
public AgeClient Client { get; } = client;
91+
92+
public IAgeClient CreateLease()
93+
{
94+
if (Interlocked.CompareExchange(ref _leaseActive, 1, 0) == 1)
95+
{
96+
throw new InvalidOperationException("An AGE client scope cannot be used concurrently. Await ongoing operations before issuing new ones within the same scope.");
97+
}
98+
99+
return new ScopedAgeClient(this);
100+
}
101+
102+
public void ReleaseLease()
103+
{
104+
Volatile.Write(ref _leaseActive, 0);
105+
}
106+
107+
public bool HasActiveLease => Volatile.Read(ref _leaseActive) == 1;
108+
}
109+
110+
private sealed class ScopedAgeClient(AgeClientFactory.AgeClientScopeState state) : IAgeClient
111+
{
112+
private readonly AgeClientScopeState _state = state;
113+
private bool _disposed;
114+
115+
public bool IsConnected => _state.Client.IsConnected;
116+
117+
public NpgsqlConnection Connection => _state.Client.Connection;
118+
119+
public Task OpenConnectionAsync(CancellationToken cancellationToken = default) =>
120+
_state.Client.OpenConnectionAsync(cancellationToken);
121+
122+
public Task CreateGraphAsync(string graphName, CancellationToken cancellationToken = default) =>
123+
_state.Client.CreateGraphAsync(graphName, cancellationToken);
124+
125+
public Task DropGraphAsync(string graphName, bool cascade = false, CancellationToken cancellationToken = default) =>
126+
_state.Client.DropGraphAsync(graphName, cascade, cancellationToken);
127+
128+
public Task ExecuteCypherAsync(string graph, string cypher, CancellationToken cancellationToken = default) =>
129+
_state.Client.ExecuteCypherAsync(graph, cypher, cancellationToken);
130+
131+
public Task<AgeDataReader> ExecuteQueryAsync(string query, CancellationToken cancellationToken = default, params object?[] parameters) =>
132+
_state.Client.ExecuteQueryAsync(query, cancellationToken, parameters);
133+
134+
public Task CloseConnectionAsync(CancellationToken cancellationToken = default) =>
135+
Task.CompletedTask;
136+
137+
public Task<bool> GraphExistsAsync(string graphName, CancellationToken cancellationToken = default) =>
138+
_state.Client.GraphExistsAsync(graphName, cancellationToken);
139+
140+
public ValueTask DisposeAsync()
141+
{
142+
if (_disposed)
143+
{
144+
return ValueTask.CompletedTask;
145+
}
146+
147+
_disposed = true;
148+
_state.ReleaseLease();
149+
return ValueTask.CompletedTask;
150+
}
151+
}
21152
}

src/ManagedCode.GraphRag.Postgres/ApacheAge/AgeConnectionManager.cs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ public interface IAgeConnectionManager : IAsyncDisposable, IDisposable
1414

1515
public sealed class AgeConnectionManager : IAgeConnectionManager
1616
{
17+
private const int ConnectionLimitMaxAttempts = 3;
18+
private static readonly TimeSpan ConnectionLimitBaseDelay = TimeSpan.FromMilliseconds(200);
19+
private static readonly TimeSpan ConnectionLimitMaxDelay = TimeSpan.FromSeconds(2);
20+
1721
private readonly NpgsqlDataSource _dataSource;
1822
private readonly ILogger<AgeConnectionManager> _logger;
1923
private volatile bool _extensionEnsured;
@@ -50,7 +54,7 @@ public async Task<NpgsqlConnection> OpenConnectionAsync(CancellationToken cancel
5054
ThrowIfDisposed();
5155

5256
await EnsureExtensionCreatedAsync(cancellationToken).ConfigureAwait(false);
53-
var connection = await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
57+
var connection = await OpenDataSourceConnectionAsync(cancellationToken).ConfigureAwait(false);
5458
await LoadAgeAsync(connection, cancellationToken).ConfigureAwait(false);
5559
await SetSearchPathAsync(connection, cancellationToken).ConfigureAwait(false);
5660
return connection;
@@ -104,8 +108,7 @@ private async Task EnsureExtensionCreatedAsync(CancellationToken cancellationTok
104108
return;
105109
}
106110

107-
await using var connection = new NpgsqlConnection(ConnectionString);
108-
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
111+
await using var connection = await OpenDataSourceConnectionAsync(cancellationToken).ConfigureAwait(false);
109112
await using var command = connection.CreateCommand();
110113
command.CommandText = "CREATE EXTENSION IF NOT EXISTS age;";
111114
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
@@ -159,4 +162,37 @@ private async Task SetSearchPathAsync(NpgsqlConnection connection, CancellationT
159162

160163
private void ThrowIfDisposed() =>
161164
ObjectDisposedException.ThrowIf(_disposed, nameof(AgeConnectionManager));
165+
166+
private async Task<NpgsqlConnection> OpenDataSourceConnectionAsync(CancellationToken cancellationToken)
167+
{
168+
var attempt = 0;
169+
170+
while (true)
171+
{
172+
cancellationToken.ThrowIfCancellationRequested();
173+
attempt++;
174+
175+
try
176+
{
177+
return await _dataSource.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
178+
}
179+
catch (PostgresException ex) when (ShouldRetry(ex, attempt))
180+
{
181+
var delay = GetRetryDelay(attempt);
182+
LogMessages.ConnectionRetrying(_logger, ConnectionString, attempt, delay, ex.MessageText);
183+
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
184+
}
185+
}
186+
}
187+
188+
private static bool ShouldRetry(PostgresException ex, int attempt) =>
189+
ex.SqlState == PostgresErrorCodes.TooManyConnections && attempt < ConnectionLimitMaxAttempts;
190+
191+
private static TimeSpan GetRetryDelay(int attempt)
192+
{
193+
var delayMillis = Math.Min(
194+
ConnectionLimitBaseDelay.TotalMilliseconds * Math.Pow(2, attempt - 1),
195+
ConnectionLimitMaxDelay.TotalMilliseconds);
196+
return TimeSpan.FromMilliseconds(delayMillis);
197+
}
162198
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace GraphRag.Storage.Postgres.ApacheAge;
2+
3+
/// <summary>
4+
/// Represents a scope that keeps an AGE connection alive for the lifetime of the scope.
5+
/// </summary>
6+
public interface IAgeClientScope : IAsyncDisposable
7+
{
8+
}

src/ManagedCode.GraphRag.Postgres/ApacheAge/LogMessages.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,17 @@ public static partial void ConnectionClosed(
2222
ILogger logger,
2323
string connectionString);
2424

25+
[LoggerMessage(
26+
EventId = AgeClientEventId.CONNECTION_RETRYING,
27+
Level = LogLevel.Warning,
28+
Message = "Connection limit reached for {connectionString}. Attempt {attempt} failed; retrying after {delay}. Reason: {reason}")]
29+
public static partial void ConnectionRetrying(
30+
ILogger logger,
31+
string connectionString,
32+
int attempt,
33+
TimeSpan delay,
34+
string reason);
35+
2536
[LoggerMessage(
2637
EventId = AgeClientEventId.NULL_CONNECTION_WARNING,
2738
Level = LogLevel.Warning,

0 commit comments

Comments
 (0)