Skip to content
This repository was archived by the owner on Mar 20, 2025. It is now read-only.

Commit 25c6173

Browse files
JimmyHannonAaronontheweb
authored andcommitted
added support for SerializerWithStringManifest (#40)
* added support for SerializerWithStringManifest * bugfix get type when manifest is no qualified name (serializerId is set) * bugfix, same as for journal
1 parent fe6f9bf commit 25c6173

File tree

9 files changed

+154
-42
lines changed

9 files changed

+154
-42
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ CREATE TABLE {your_metadata_table_name} (
112112

113113
### Migration
114114

115+
#### From 1.1.0 to 1.3.1
116+
```SQL
117+
ALTER TABLE {your_journal_table_name} ADD COLUMN serializer_id INTEGER NULL;
118+
ALTER TABLE {your_snapshot_table_name} ADD COLUMN serializer_id INTEGER NULL;
119+
```
120+
115121
#### From 1.0.6 to 1.1.0
116122
```SQL
117123
CREATE TABLE {your_metadata_table_name} (

src/Akka.Persistence.PostgreSql.Tests/Akka.Persistence.PostgreSql.Tests.csproj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88

99
<ItemGroup>
1010
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0-preview-20170425-07" />
11-
<PackageReference Include="Akka" Version="1.3.0" />
12-
<PackageReference Include="Akka.Persistence" Version="1.3.0" />
13-
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.0" />
14-
<PackageReference Include="Akka.Persistence.Sql.TestKit" Version="1.3.0" />
15-
<PackageReference Include="Akka.TestKit" Version="1.3.0" />
11+
<PackageReference Include="Akka" Version="1.3.1" />
12+
<PackageReference Include="Akka.Persistence" Version="1.3.1" />
13+
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.1" />
14+
<PackageReference Include="Akka.Persistence.Sql.TestKit" Version="1.3.1" />
15+
<PackageReference Include="Akka.TestKit" Version="1.3.1" />
1616
<PackageReference Include="Microsoft.Extensions.Configuration" Version="1.1.2" />
1717
<PackageReference Include="Microsoft.Extensions.Configuration.Xml" Version="1.1.2" />
1818
<PackageReference Include="Npgsql" Version="3.2.5" />

src/Akka.Persistence.PostgreSql/Akka.Persistence.PostgreSql.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
</ItemGroup>
2121

2222
<ItemGroup>
23-
<PackageReference Include="Akka" Version="1.3.0" />
24-
<PackageReference Include="Akka.Persistence" Version="1.3.0" />
25-
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.0" />
23+
<PackageReference Include="Akka" Version="1.3.1" />
24+
<PackageReference Include="Akka.Persistence" Version="1.3.1" />
25+
<PackageReference Include="Akka.Persistence.Sql.Common" Version="1.3.1" />
2626
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
2727
<PackageReference Include="Npgsql" Version="3.2.5" />
2828
</ItemGroup>

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlJournal.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public PostgreSqlJournal(Config journalConfig) : base(journalConfig)
4444
isDeletedColumnName: "is_deleted",
4545
tagsColumnName: "tags",
4646
orderingColumn: "ordering",
47+
serializerIdColumnName: "serializer_id",
4748
timeout: config.GetTimeSpan("connection-timeout"),
4849
storedAs: storedAs,
4950
defaultSerializer: config.GetString("serializer")),

src/Akka.Persistence.PostgreSql/Journal/PostgreSqlQueryExecutor.cs

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77

88
using Akka.Actor;
99
using Akka.Persistence.Sql.Common.Journal;
10+
using Akka.Serialization;
1011
using Akka.Util;
1112
using Newtonsoft.Json;
1213
using Npgsql;
1314
using NpgsqlTypes;
1415
using System;
15-
using System.Collections.Generic;
1616
using System.Collections.Immutable;
1717
using System.Data;
1818
using System.Data.Common;
@@ -24,8 +24,8 @@ namespace Akka.Persistence.PostgreSql.Journal
2424
public class PostgreSqlQueryExecutor : AbstractQueryExecutor
2525
{
2626
private readonly PostgreSqlQueryConfiguration _configuration;
27-
private readonly Func<IPersistentRepresentation, KeyValuePair<NpgsqlDbType, object>> _serialize;
28-
private readonly Func<Type, object, object> _deserialize;
27+
private readonly Func<IPersistentRepresentation, SerializationResult> _serialize;
28+
private readonly Func<Type, object, string, int?, object> _deserialize;
2929

3030
public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization, ITimestampProvider timestampProvider)
3131
: base(configuration, serialization, timestampProvider)
@@ -43,6 +43,7 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
4343
{Configuration.ManifestColumnName} VARCHAR(500) NOT NULL,
4444
{Configuration.PayloadColumnName} {storedAs} NOT NULL,
4545
{Configuration.TagsColumnName} VARCHAR(100) NULL,
46+
{Configuration.SerializerIdColumnName} INTEGER NULL,
4647
CONSTRAINT {Configuration.JournalEventsTableName}_uq UNIQUE ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
4748
);
4849
";
@@ -57,16 +58,32 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
5758
switch (_configuration.StoredAs)
5859
{
5960
case StoredAsType.ByteA:
60-
_serialize = e => new KeyValuePair<NpgsqlDbType, object>(NpgsqlDbType.Bytea, Serialization.FindSerializerFor(e.Payload).ToBinary(e.Payload));
61-
_deserialize = (type, serialized) => Serialization.FindSerializerForType(type).FromBinary((byte[])serialized, type);
61+
_serialize = e =>
62+
{
63+
var serializer = Serialization.FindSerializerFor(e.Payload);
64+
return new SerializationResult(NpgsqlDbType.Bytea, serializer.ToBinary(e.Payload), serializer);
65+
};
66+
_deserialize = (type, serialized, manifest, serializerId) =>
67+
{
68+
if (serializerId.HasValue)
69+
{
70+
return Serialization.Deserialize((byte[])serialized, serializerId.Value, manifest);
71+
}
72+
else
73+
{
74+
// Support old writes that did not set the serializer id
75+
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
76+
return deserializer.FromBinary((byte[])serialized, type);
77+
}
78+
};
6279
break;
6380
case StoredAsType.JsonB:
64-
_serialize = e => new KeyValuePair<NpgsqlDbType, object>(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, _configuration.JsonSerializerSettings));
65-
_deserialize = (type, serialized) => JsonConvert.DeserializeObject((string)serialized, type, _configuration.JsonSerializerSettings);
81+
_serialize = e => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e.Payload, _configuration.JsonSerializerSettings), null);
82+
_deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, _configuration.JsonSerializerSettings);
6683
break;
6784
case StoredAsType.Json:
68-
_serialize = e => new KeyValuePair<NpgsqlDbType, object>(NpgsqlDbType.Json, JsonConvert.SerializeObject(e.Payload, _configuration.JsonSerializerSettings));
69-
_deserialize = (type, serialized) => JsonConvert.DeserializeObject((string)serialized, type, _configuration.JsonSerializerSettings);
85+
_serialize = e => new SerializationResult(NpgsqlDbType.Json, JsonConvert.SerializeObject(e.Payload, _configuration.JsonSerializerSettings), null);
86+
_deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, _configuration.JsonSerializerSettings);
7087
break;
7188
default:
7289
throw new NotSupportedException($"{_configuration.StoredAs} is not supported Db type for a payload");
@@ -79,15 +96,34 @@ public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.
7996

8097
protected override void WriteEvent(DbCommand command, IPersistentRepresentation e, IImmutableSet<string> tags)
8198
{
82-
var manifest = string.IsNullOrEmpty(e.Manifest) ? QualifiedName(e) : e.Manifest;
83-
var t = _serialize(e);
99+
var serializationResult = _serialize(e);
100+
var serializer = serializationResult.Serializer;
101+
var hasSerializer = serializer != null;
102+
103+
string manifest = "";
104+
if (hasSerializer && serializer is SerializerWithStringManifest)
105+
manifest = ((SerializerWithStringManifest)serializer).Manifest(e.Payload);
106+
else if (hasSerializer && serializer.IncludeManifest)
107+
manifest = QualifiedName(e);
108+
else
109+
manifest = string.IsNullOrEmpty(e.Manifest) ? QualifiedName(e) : e.Manifest;
84110

85111
AddParameter(command, "@PersistenceId", DbType.String, e.PersistenceId);
86112
AddParameter(command, "@SequenceNr", DbType.Int64, e.SequenceNr);
87113
AddParameter(command, "@Timestamp", DbType.Int64, TimestampProvider.GenerateTimestamp(e));
88114
AddParameter(command, "@IsDeleted", DbType.Boolean, false);
89115
AddParameter(command, "@Manifest", DbType.String, manifest);
90-
command.Parameters.Add(new NpgsqlParameter("@Payload", t.Key) { Value = t.Value });
116+
117+
if (hasSerializer)
118+
{
119+
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
120+
}
121+
else
122+
{
123+
AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value);
124+
}
125+
126+
command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload });
91127

92128
if (tags.Count != 0)
93129
{
@@ -116,9 +152,19 @@ protected override IPersistentRepresentation ReadEvent(DbDataReader reader)
116152
var isDeleted = reader.GetBoolean(IsDeletedIndex);
117153
var manifest = reader.GetString(ManifestIndex);
118154
var raw = reader[PayloadIndex];
119-
var type = Type.GetType(manifest, true);
120155

121-
var deserialized = _deserialize(type, raw);
156+
int? serializerId = null;
157+
Type type = null;
158+
if (reader.IsDBNull(SerializerIdIndex))
159+
{
160+
type = Type.GetType(manifest, true);
161+
}
162+
else
163+
{
164+
serializerId = reader.GetInt32(SerializerIdIndex);
165+
}
166+
167+
var deserialized = _deserialize(type, raw, manifest, serializerId);
122168

123169
return new Persistent(deserialized, sequenceNr, persistenceId, manifest, isDeleted, ActorRefs.NoSender, null);
124170
}
@@ -141,12 +187,13 @@ public PostgreSqlQueryConfiguration(
141187
string isDeletedColumnName,
142188
string tagsColumnName,
143189
string orderingColumn,
190+
string serializerIdColumnName,
144191
TimeSpan timeout,
145192
StoredAsType storedAs,
146193
string defaultSerializer,
147194
JsonSerializerSettings jsonSerializerSettings = null)
148195
: base(schemaName, journalEventsTableName, metaTableName, persistenceIdColumnName, sequenceNrColumnName,
149-
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn, timeout, defaultSerializer)
196+
payloadColumnName, manifestColumnName, timestampColumnName, isDeletedColumnName, tagsColumnName, orderingColumn, serializerIdColumnName, timeout, defaultSerializer)
150197
{
151198
StoredAs = storedAs;
152199
JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
using Akka.Serialization;
2+
using NpgsqlTypes;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Linq;
6+
using System.Text;
7+
using System.Threading.Tasks;
8+
9+
namespace Akka.Persistence.PostgreSql
10+
{
11+
internal class SerializationResult
12+
{
13+
public SerializationResult(NpgsqlDbType dbType, object payload, Serializer serializer)
14+
{
15+
DbType = dbType;
16+
Payload = payload;
17+
Serializer = serializer;
18+
}
19+
20+
public NpgsqlDbType DbType { get; private set; }
21+
public object Payload { get; private set; }
22+
public Serializer Serializer { get; private set; }
23+
24+
}
25+
}
26+

src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,15 @@
1010
using Npgsql;
1111
using NpgsqlTypes;
1212
using System;
13-
using System.Collections.Generic;
1413
using System.Data;
1514
using System.Data.Common;
1615

1716
namespace Akka.Persistence.PostgreSql.Snapshot
1817
{
1918
public class PostgreSqlQueryExecutor : AbstractQueryExecutor
2019
{
21-
private readonly Func<object, KeyValuePair<NpgsqlDbType, object>> _serialize;
22-
private readonly Func<Type, object, object> _deserialize;
20+
private readonly Func<object, SerializationResult> _serialize;
21+
private readonly Func<Type, object, string, int?, object> _deserialize;
2322
public PostgreSqlQueryExecutor(PostgreSqlQueryConfiguration configuration, Akka.Serialization.Serialization serialization) : base(configuration, serialization)
2423
{
2524
CreateSnapshotTableSql = $@"
@@ -33,6 +32,7 @@ IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{Co
3332
{Configuration.TimestampColumnName} BIGINT NOT NULL,
3433
{Configuration.ManifestColumnName} VARCHAR(500) NOT NULL,
3534
{Configuration.PayloadColumnName} {configuration.StoredAs.ToString().ToUpperInvariant()} NOT NULL,
35+
{Configuration.SerializerIdColumnName} INTEGER NULL,
3636
CONSTRAINT {Configuration.SnapshotTableName}_pk PRIMARY KEY ({Configuration.PersistenceIdColumnName}, {Configuration.SequenceNrColumnName})
3737
);
3838
CREATE INDEX {Configuration.SnapshotTableName}_{Configuration.SequenceNrColumnName}_idx ON {Configuration.FullSnapshotTableName}({Configuration.SequenceNrColumnName});
@@ -55,23 +55,40 @@ WITH upsert AS (
5555
{Configuration.SequenceNrColumnName},
5656
{Configuration.TimestampColumnName},
5757
{Configuration.ManifestColumnName},
58-
{Configuration.PayloadColumnName})
59-
SELECT @PersistenceId, @SequenceNr, @Timestamp, @Manifest, @Payload
58+
{Configuration.PayloadColumnName},
59+
{Configuration.SerializerIdColumnName})
60+
SELECT @PersistenceId, @SequenceNr, @Timestamp, @Manifest, @Payload, @SerializerId
6061
WHERE NOT EXISTS (SELECT * FROM upsert)";
6162

6263
switch (configuration.StoredAs)
6364
{
6465
case StoredAsType.ByteA:
65-
_serialize = e => new KeyValuePair<NpgsqlDbType, object>(NpgsqlDbType.Bytea, serialization.FindSerializerFor(e).ToBinary(e));
66-
_deserialize = (type, serialized) => serialization.FindSerializerForType(type).FromBinary((byte[])serialized, type);
66+
_serialize = ss =>
67+
{
68+
var serializer = Serialization.FindSerializerFor(ss);
69+
return new SerializationResult(NpgsqlDbType.Bytea, serializer.ToBinary(ss), serializer);
70+
};
71+
_deserialize = (type, serialized, manifest, serializerId) =>
72+
{
73+
if (serializerId.HasValue)
74+
{
75+
return Serialization.Deserialize((byte[])serialized, serializerId.Value, manifest);
76+
}
77+
else
78+
{
79+
// Support old writes that did not set the serializer id
80+
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
81+
return deserializer.FromBinary((byte[])serialized, type);
82+
}
83+
};
6784
break;
6885
case StoredAsType.JsonB:
69-
_serialize = e => new KeyValuePair<NpgsqlDbType, object>(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(e, configuration.JsonSerializerSettings));
70-
_deserialize = (type, serialized) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings);
86+
_serialize = ss => new SerializationResult(NpgsqlDbType.Jsonb, JsonConvert.SerializeObject(ss, configuration.JsonSerializerSettings), null);
87+
_deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings);
7188
break;
7289
case StoredAsType.Json:
73-
_serialize = e => new KeyValuePair<NpgsqlDbType, object>(NpgsqlDbType.Json, JsonConvert.SerializeObject(e, configuration.JsonSerializerSettings));
74-
_deserialize = (type, serialized) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings);
90+
_serialize = ss => new SerializationResult(NpgsqlDbType.Json, JsonConvert.SerializeObject(ss, configuration.JsonSerializerSettings), null);
91+
_deserialize = (type, serialized, manifest, serializerId) => JsonConvert.DeserializeObject((string)serialized, type, configuration.JsonSerializerSettings);
7592
break;
7693
default:
7794
throw new NotSupportedException($"{configuration.StoredAs} is not supported Db type for a payload");
@@ -88,19 +105,32 @@ protected override DbCommand CreateCommand(DbConnection connection)
88105
protected override void SetTimestampParameter(DateTime timestamp, DbCommand command) => AddParameter(command, "@Timestamp", DbType.Int64, timestamp.Ticks);
89106
protected override void SetPayloadParameter(object snapshot, DbCommand command)
90107
{
91-
var t = _serialize(snapshot);
92-
command.Parameters.Add(new NpgsqlParameter("@Payload", t.Key) { Value = t.Value });
108+
var serializationResult = _serialize(snapshot);
109+
command.Parameters.Add(new NpgsqlParameter("@Payload", serializationResult.DbType) { Value = serializationResult.Payload });
93110
}
94111

95112
protected override SelectedSnapshot ReadSnapshot(DbDataReader reader)
96113
{
97114
var persistenceId = reader.GetString(0);
98115
var sequenceNr = reader.GetInt64(1);
99116
var timestamp = new DateTime(reader.GetInt64(2));
100-
var type = Type.GetType(reader.GetString(3), true);
101-
var snapshot = _deserialize(type, reader[4]);
102-
103-
return new SelectedSnapshot(new SnapshotMetadata(persistenceId, sequenceNr, timestamp), snapshot);
117+
var manifest = reader.GetString(3);
118+
119+
int? serializerId = null;
120+
Type type = null;
121+
if (reader.IsDBNull(5))
122+
{
123+
type = Type.GetType(manifest, true);
124+
}
125+
else
126+
{
127+
serializerId = reader.GetInt32(5);
128+
}
129+
130+
var snapshot = _deserialize(type, reader[4], manifest, serializerId);
131+
132+
var metadata = new SnapshotMetadata(persistenceId, sequenceNr, timestamp);
133+
return new SelectedSnapshot(metadata, snapshot);
104134
}
105135

106136
protected override string CreateSnapshotTableSql { get; }
@@ -120,11 +150,12 @@ public PostgreSqlQueryConfiguration(
120150
string payloadColumnName,
121151
string manifestColumnName,
122152
string timestampColumnName,
153+
string serializerIdColumnName,
123154
TimeSpan timeout,
124155
StoredAsType storedAs,
125156
string defaultSerializer,
126157
JsonSerializerSettings jsonSerializerSettings = null)
127-
: base(schemaName, snapshotTableName, persistenceIdColumnName, sequenceNrColumnName, payloadColumnName, manifestColumnName, timestampColumnName, timeout, defaultSerializer)
158+
: base(schemaName, snapshotTableName, persistenceIdColumnName, sequenceNrColumnName, payloadColumnName, manifestColumnName, timestampColumnName, serializerIdColumnName, timeout, defaultSerializer)
128159
{
129160
StoredAs = storedAs;
130161
JsonSerializerSettings = jsonSerializerSettings ?? new JsonSerializerSettings

0 commit comments

Comments
 (0)