Skip to content
This repository was archived by the owner on Mar 20, 2025. It is now read-only.

Commit 27e271c

Browse files
Merge pull request #54 from AkkaNetContrib/dev
Akka.Persistence.PostgreSql v1.3.8 release
2 parents 9c2b695 + 195b938 commit 27e271c

File tree

12 files changed

+82
-38
lines changed

12 files changed

+82
-38
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ CREATE TABLE {your_journal_table_name} (
9191
manifest VARCHAR(500) NOT NULL,
9292
payload BYTEA NOT NULL,
9393
tags VARCHAR(100) NULL,
94+
serializer_id INTEGER NULL,
9495
CONSTRAINT {your_journal_table_name}_uq UNIQUE (persistence_id, sequence_nr)
9596
);
9697

@@ -99,7 +100,8 @@ CREATE TABLE {your_snapshot_table_name} (
99100
sequence_nr BIGINT NOT NULL,
100101
created_at BIGINT NOT NULL,
101102
manifest VARCHAR(500) NOT NULL,
102-
snapshot BYTEA NOT NULL,
103+
payload BYTEA NOT NULL,
104+
serializer_id INTEGER NULL,
103105
CONSTRAINT {your_snapshot_table_name}_pk PRIMARY KEY (persistence_id, sequence_nr)
104106
);
105107

@@ -155,4 +157,4 @@ or run postgres in docker
155157

156158
```
157159
docker run -d --rm --name=akka-postgres-db -p 5432:5432 -l deployer=akkadotnet -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:9.6
158-
```
160+
```

RELEASE_NOTES.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1+
#### 1.3.8 July 6 2018 ####
2+
Upgraded to support Akka.NET 1.3.8 and to take advantage of some performance improvements that have been added to Akka.Persistence for loading large snapshots, which you can read more about here: https://github.com/akkadotnet/akka.net/issues/3422
3+
4+
Note that this feature is currently disabled by default in Akka.Persistence.PostgreSql due to https://github.com/AkkaNetContrib/Akka.Persistence.PostgreSql/issues/53
5+
16
#### 1.3.1 September 11 2017 ####
7+
Support for Akka.NET 1.3, .NET Standard 1.6, and the first stable RTM release of Akka.Persistence.
8+
9+
Migration from 1.1.0-beta Up**
10+
The event journal and snapshot store schema has changed with this release. In order to keep existing stores compatible with this release, you **must** add a column to both stores for `SerializerId` like so:
11+
```sql
12+
ALTER TABLE {your_journal_table_name} ADD COLUMN SerializerId INTEGER NULL
13+
ALTER TABLE {your_snapshot_table_name} ADD COLUMN SerializerId INTEGER NULL
14+
```
215

316
#### 1.1.2 January 2017 ####
417

src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,9 @@
77
</PropertyGroup>
88

99
<ItemGroup>
10-
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0-preview-20170425-07" />
11-
<PackageReference Include="Akka" Version="1.3.1" />
12-
<PackageReference Include="Akka.Persistence" Version="1.3.1" />
13-
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.1" />
14-
<PackageReference Include="Akka.Persistence.Sql.TestKit" Version="1.3.1" />
15-
<PackageReference Include="Akka.TestKit" Version="1.3.1" />
10+
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.8" />
11+
<PackageReference Include="Akka.Persistence.Sql.TestKit" Version="1.3.8" />
12+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
1613
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
1714
<PackageReference Include="Microsoft.Extensions.Configuration.Xml" Version="1.1.2" />
1815
<PackageReference Include="Npgsql" Version="3.2.5" />

src/Akka.Persistence.PostgreSql.Tests/PostgreSqlConfigSpec.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public void Should_PostgreSql_journal_has_default_config()
3131
Assert.Equal("metadata", config.GetString("metadata-table-name"));
3232
Assert.False(config.GetBoolean("auto-initialize"));
3333
Assert.Equal("Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common", config.GetString("timestamp-provider"));
34+
Assert.False(config.GetBoolean("sequential-access"));
3435
}
3536

3637
[Fact]
@@ -49,6 +50,7 @@ public void Should_PostgreSql_snapshot_has_default_config()
4950
Assert.Equal("public", config.GetString("schema-name"));
5051
Assert.Equal("snapshot_store", config.GetString("table-name"));
5152
Assert.False(config.GetBoolean("auto-initialize"));
53+
Assert.False(config.GetBoolean("sequential-access"));
5254
}
5355
}
5456
}

src/Akka.Persistence.PostgreSql.Tests/app.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22
<configuration>
33
<connectionStrings>
44
<add name="TestDb" connectionString="Server=db;Port=5432;Database=postgres;User Id=postgres;Password=postgres;" />
5+
<!--<add name="TestDb" connectionString="Server=127.0.0.1;Port=5432;Database=postgres;User Id=postgres;Password=postgres;" />-->
56
</connectionStrings>
67
</configuration>

src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
</ItemGroup>
2121

2222
<ItemGroup>
23-
<PackageReference Include="Akka" Version="1.3.1" />
24-
<PackageReference Include="Akka.Persistence" Version="1.3.1" />
25-
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.1" />
26-
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
23+
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.8" />
2724
<PackageReference Include="Npgsql" Version="3.2.5" />
2825
</ItemGroup>
2926

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlJournal.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public PostgreSqlJournal(Config journalConfig) : base(journalConfig)
4747
serializerIdColumnName: "serializer_id",
4848
timeout: config.GetTimeSpan("connection-timeout"),
4949
storedAs: storedAs,
50-
defaultSerializer: config.GetString("serializer")),
50+
defaultSerializer: config.GetString("serializer"),
51+
useSequentialAccess: config.GetBoolean("sequential-access")),
5152
Context.System.Serialization,
5253
GetTimestampProvider(config.GetString("timestamp-provider")));
5354

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,11 @@ public PostgreSqlQueryConfiguration(
191191
TimeSpan timeout,
192192
StoredAsType storedAs,
193193
string defaultSerializer,
194-
JsonSerializerSettings jsonSerializerSettings = null)
194+
JsonSerializerSettings jsonSerializerSettings = null,
195+
bool useSequentialAccess = true)
195196
: base(schemaName, journalEventsTableName, metaTableName, persistenceIdColumnName, sequenceNrColumnName,
196-
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn, serializerIdColumnName, timeout, defaultSerializer)
197+
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn,
198+
serializerIdColumnName, timeout, defaultSerializer, useSequentialAccess)
197199
{
198200
StoredAs = storedAs;
199201
JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings

src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
//-----------------------------------------------------------------------
77

88
using Akka.Persistence.Sql.Common.Snapshot;
9+
using Akka.Serialization;
10+
using Akka.Util;
911
using Newtonsoft.Json;
1012
using Npgsql;
1113
using NpgsqlTypes;
@@ -99,7 +101,7 @@ WITH upsert AS (
99101

100102
protected override DbCommand CreateCommand(DbConnection connection)
101103
{
102-
return ((NpgsqlConnection) connection).CreateCommand();
104+
return ((NpgsqlConnection)connection).CreateCommand();
103105
}
104106

105107
protected override void SetTimestampParameter(DateTime timestamp, DbCommand command) => AddParameter(command, "@Timestamp", DbType.Int64, timestamp.Ticks);
@@ -109,6 +111,25 @@ protected override void SetPayloadParameter(object snapshot, DbCommand command)
109111
command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload });
110112
}
111113

114+
protected override void SetManifestParameters(object snapshot, DbCommand command)
115+
{
116+
var snapshotType = snapshot.GetType();
117+
var serializer = Serialization.FindSerializerForType(snapshotType, Configuration.DefaultSerializer);
118+
119+
string manifest = "";
120+
if (serializer is SerializerWithStringManifest)
121+
{
122+
manifest = ((SerializerWithStringManifest)serializer).Manifest(snapshot);
123+
}
124+
else if (!serializer.IncludeManifest)
125+
{
126+
manifest = snapshotType.TypeQualifiedName();
127+
}
128+
129+
AddParameter(command, "@Manifest", DbType.String, manifest);
130+
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
131+
}
132+
112133
protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
113134
{
114135
var persistenceId = reader.GetString(0);
@@ -118,14 +139,12 @@ protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
118139

119140
int? serializerId = null;
120141
Type type = null;
121-
if (reader.IsDBNull(5))
122-
{
123-
type = Type.GetType(manifest, true);
124-
}
125-
else
126-
{
142+
143+
if (!string.IsNullOrEmpty(manifest))
144+
type = Type.GetType(manifest, throwOnError: true);
145+
146+
if (!reader.IsDBNull(5))
127147
serializerId = reader.GetInt32(5);
128-
}
129148

130149
var snapshot = _deserialize(type, reader[4], manifest, serializerId);
131150

@@ -143,19 +162,21 @@ public class PostgreSqlQueryConfiguration : QueryConfiguration
143162
public readonly JsonSerializerSettings JsonSerializerSettings;
144163

145164
public PostgreSqlQueryConfiguration(
146-
string schemaName,
147-
string snapshotTableName,
148-
string persistenceIdColumnName,
149-
string sequenceNrColumnName,
150-
string payloadColumnName,
151-
string manifestColumnName,
152-
string timestampColumnName,
165+
string schemaName,
166+
string snapshotTableName,
167+
string persistenceIdColumnName,
168+
string sequenceNrColumnName,
169+
string payloadColumnName,
170+
string manifestColumnName,
171+
string timestampColumnName,
153172
string serializerIdColumnName,
154-
TimeSpan timeout,
173+
TimeSpan timeout,
155174
StoredAsType storedAs,
156175
string defaultSerializer,
157-
JsonSerializerSettings jsonSerializerSettings = null)
158-
: base(schemaName, snapshotTableName, persistenceIdColumnName, sequenceNrColumnName, payloadColumnName, manifestColumnName, timestampColumnName, serializerIdColumnName, timeout, defaultSerializer)
176+
JsonSerializerSettings jsonSerializerSettings = null,
177+
bool useSequentialAccess = true)
178+
: base(schemaName, snapshotTableName, persistenceIdColumnName, sequenceNrColumnName, payloadColumnName,
179+
manifestColumnName, timestampColumnName, serializerIdColumnName, timeout, defaultSerializer, useSequentialAccess)
159180
{
160181
StoredAs = storedAs;
161182
JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings

src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlSnapshotStore.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public PostgreSqlSnapshotStore(Config snapshotConfig) : base(snapshotConfig)
4141
serializerIdColumnName: "serializer_id",
4242
timeout: config.GetTimeSpan("connection-timeout"),
4343
storedAs: storedAs,
44-
defaultSerializer: config.GetString("serializer")),
44+
defaultSerializer: config.GetString("serializer"),
45+
useSequentialAccess: config.GetBoolean("sequential-access")),
4546
Context.System.Serialization);
4647

4748
SnapshotSettings = new PostgreSqlSnapshotStoreSettings(config);

0 commit comments

Comments
 (0)