Skip to content

Commit d3d319c

Browse files
authored
Fixing goavro bug due to codec state mutation (#4070)
Signed-off-by: Patrick Assuied <[email protected]>
1 parent 89181b7 commit d3d319c

File tree

7 files changed

+99
-22
lines changed

7 files changed

+99
-22
lines changed

common/component/kafka/kafka.go

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ type Kafka struct {
7272
latestSchemaCacheWriteLock sync.RWMutex
7373
latestSchemaCacheReadLock sync.Mutex
7474

75+
// Whether to encode/decode Avro into Avro JSON or standard JSON
76+
useAvroJSON bool
77+
7578
// used for background logic that cannot use the context passed to the Init function
7679
internalContext context.Context
7780
internalContextCancel func()
@@ -228,6 +231,7 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
228231
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
229232
k.srClient.CodecCreationEnabled(true)
230233
k.srClient.CodecJsonEnabled(!meta.UseAvroJSON)
234+
k.useAvroJSON = meta.UseAvroJSON
231235
// Empty password is a possibility
232236
if meta.SchemaRegistryAPIKey != "" {
233237
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
@@ -405,18 +409,27 @@ func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec,
405409
if errSchema != nil {
406410
return nil, nil, errSchema
407411
}
408-
codec := schema.Codec()
409412

413+
codec, errCodec := k.getCodec(schema)
414+
if errCodec != nil {
415+
return nil, nil, errCodec
416+
}
417+
defer k.latestSchemaCacheWriteLock.Unlock()
410418
k.latestSchemaCacheWriteLock.Lock()
411419
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
412-
k.latestSchemaCacheWriteLock.Unlock()
420+
413421
return schema, codec, nil
414422
}
415423
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
416424
if err != nil {
417425
return nil, nil, err
418426
}
419-
return schema, schema.Codec(), nil
427+
codec, errCodec := k.getCodec(schema)
428+
if errCodec != nil {
429+
return nil, nil, errCodec
430+
}
431+
432+
return schema, codec, nil
420433
}
421434

422435
func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
@@ -427,6 +440,17 @@ func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error)
427440
return k.srClient, nil
428441
}
429442

443+
func (k *Kafka) getCodec(schema *srclient.Schema) (*goavro.Codec, error) {
444+
// The data coming through is either Avro JSON or standard JSON.
445+
// Force creation of a new codec instance for serialization and deserialization to avoid state mutation issues.
446+
// https://github.com/linkedin/goavro/issues/299
447+
// Once the bug is fixed, we can remove this and use the codec directly from schema.Codec()
448+
if k.useAvroJSON {
449+
return goavro.NewCodec(schema.Schema())
450+
}
451+
return goavro.NewCodecForStandardJSONFull(schema.Schema())
452+
}
453+
430454
func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
431455
// Null Data is valid and a tombstone record.
432456
// It should be converted to NULL and not go through schema validation & encoding

common/component/kafka/kafka_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func TestDeserializeValue(t *testing.T) {
8080
logger: logger.NewLogger("kafka_test"),
8181
}
8282
kJSON.srClient.CodecJsonEnabled(true)
83+
kJSON.useAvroJSON = false
8384
schemaJSON, _ := registryJSON.CreateSchema("my-topic-value", testSchema1, srclient.Avro)
8485

8586
// set up for Standard JSON
@@ -194,6 +195,41 @@ func TestDeserializeValue(t *testing.T) {
194195
_, err := kInv.DeserializeValue(&msg, handlerConfig)
195196
require.Error(t, err, "schema registry details not set")
196197
})
198+
199+
t.Run("verifying issue with union types due to codec state mutation is fixed", func(t *testing.T) {
200+
// Arrange
201+
testSchemaUnion := `["null", "long"]`
202+
203+
// In happy path, codec is initialized and NativeFromBinary is called first, which sets the states of the codec
204+
codecCard1, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion)
205+
require.NoError(t, err)
206+
207+
datum1, _, err := codecCard1.NativeFromBinary([]byte{0x02, 0x06})
208+
require.NoError(t, err)
209+
210+
// As expected, the datum is a long with value 3
211+
require.Equal(t, int64(3), datum1.(map[string]any)["long"])
212+
213+
// Reproducing the error when NativeFromTextual is called before NativeFromBinary, which changes the states of the codec
214+
codecCard2, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion)
215+
require.NoError(t, err)
216+
217+
// Trigger textual path that mutates states
218+
codecCard2.NativeFromTextual([]byte("1"))
219+
220+
// Binary for union index 1 (long) with value 3: 0x02 0x06
221+
datum, _, err := codecCard2.NativeFromBinary([]byte{0x02, 0x06})
222+
require.NoError(t, err)
223+
224+
// Prior to bug fix, the datum would be returned as a {"null", 3} but should return '{"long":3}'!
225+
require.Nil(t, datum.(map[string]any)["null"])
226+
require.Equal(t, int64(3), datum.(map[string]any)["long"])
227+
228+
// As a result, next call to TextualFromNative would fail with "Cannot encode textual union: cannot encode textual null: expected: Go nil; received: int64"
229+
act, err := codecCard2.TextualFromNative(nil, datum)
230+
require.NoError(t, err)
231+
require.Equal(t, []byte("3"), act)
232+
})
197233
}
198234

199235
func formatByteRecord(schemaID int, valueBytes []byte) []byte {
@@ -249,12 +285,14 @@ func TestSerializeValueCachingDisabled(t *testing.T) {
249285
srClient: registryJSON,
250286
schemaCachingEnabled: false,
251287
logger: logger.NewLogger("kafka_test"),
288+
useAvroJSON: false,
252289
}
253290

254291
kAvroJSON := Kafka{
255292
srClient: registryAvroJSON,
256293
schemaCachingEnabled: false,
257294
logger: logger.NewLogger("kafka_test"),
295+
useAvroJSON: true,
258296
}
259297

260298
t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {
@@ -327,6 +365,7 @@ func TestSerializeValueCachingEnabled(t *testing.T) {
327365
latestSchemaCache: make(map[string]SchemaCacheEntry),
328366
latestSchemaCacheTTL: time.Minute * 5,
329367
logger: logger.NewLogger("kafka_test"),
368+
useAvroJSON: false,
330369
}
331370

332371
t.Run("valueSchemaType not set, leave value as is", func(t *testing.T) {

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ require (
9292
github.com/labd/commercetools-go-sdk v1.3.1
9393
github.com/lestrrat-go/httprc v1.0.5
9494
github.com/lestrrat-go/jwx/v2 v2.0.21
95-
github.com/linkedin/goavro/v2 v2.14.0
95+
github.com/linkedin/goavro/v2 v2.14.1
9696
github.com/machinebox/graphql v0.2.2
9797
github.com/matoous/go-nanoid/v2 v2.0.0
9898
github.com/microsoft/go-mssqldb v1.6.0
@@ -111,7 +111,7 @@ require (
111111
github.com/puzpuzpuz/xsync/v3 v3.0.0
112112
github.com/rabbitmq/amqp091-go v1.9.0
113113
github.com/redis/go-redis/v9 v9.6.3
114-
github.com/riferrei/srclient v0.7.2
114+
github.com/riferrei/srclient v0.7.3
115115
github.com/sendgrid/sendgrid-go v3.13.0+incompatible
116116
github.com/sijms/go-ora/v2 v2.8.22
117117
github.com/spf13/cast v1.8.0

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,8 +1187,8 @@ github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPp
11871187
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
11881188
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
11891189
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
1190-
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
1191-
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
1190+
github.com/linkedin/goavro/v2 v2.14.1 h1:/8VjDpd38PRsy02JS0jflAu7JZPfJcGTwqWgMkFS2iI=
1191+
github.com/linkedin/goavro/v2 v2.14.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
11921192
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
11931193
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
11941194
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
@@ -1517,8 +1517,8 @@ github.com/redis/go-redis/v9 v9.6.3/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJ
15171517
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
15181518
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
15191519
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
1520-
github.com/riferrei/srclient v0.7.2 h1:Gc1juajxHs9L1LYy+W6Iy7RDVBZkgCdKl/dxb3/c2xE=
1521-
github.com/riferrei/srclient v0.7.2/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc=
1520+
github.com/riferrei/srclient v0.7.3 h1:JRR6jgfINWUcYZhBRHEg/NAFv7giVmjkoouRbWbakgw=
1521+
github.com/riferrei/srclient v0.7.3/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc=
15221522
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
15231523
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
15241524
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=

middleware/http/wasm/internal/e2e_test.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,23 @@ func Test_EndToEnd(t *testing.T) {
8080
// init (main) and the request[0-9] funcs to info level.
8181
//
8282
// Then, we expect to see stdout and stderr from both scopes
83-
// at debug level.
83+
// at debug level. Allow duplicates from multi-module pools by
84+
// checking substrings instead of exact combined lines.
8485
for _, s := range []string{
8586
`level=info msg="main ConsoleLog"`,
8687
`level=info msg="request[0] ConsoleLog"`,
87-
`level=debug msg="wasm stdout: main Stdout\nrequest[0] Stdout\n"`,
88-
`level=debug msg="wasm stderr: main Stderr\nrequest[0] Stderr\n"`,
8988
} {
9089
require.Contains(t, log.String(), s)
9190
}
91+
92+
// stdout
93+
require.Contains(t, log.String(), `level=debug msg="wasm stdout:`)
94+
require.Contains(t, log.String(), "main Stdout")
95+
require.Contains(t, log.String(), "request[0] Stdout")
96+
// stderr
97+
require.Contains(t, log.String(), `level=debug msg="wasm stderr:`)
98+
require.Contains(t, log.String(), "main Stderr")
99+
require.Contains(t, log.String(), "request[0] Stderr")
92100
},
93101
},
94102
{
@@ -108,14 +116,20 @@ func Test_EndToEnd(t *testing.T) {
108116
for _, s := range []string{
109117
`level=info msg="main ConsoleLog"`,
110118
`level=info msg="request[0] ConsoleLog"`,
111-
`level=debug msg="wasm stdout: main Stdout\nrequest[0] Stdout\n"`,
112-
`level=debug msg="wasm stderr: main Stderr\nrequest[0] Stderr\n"`,
113119
`level=info msg="request[1] ConsoleLog"`,
114-
`level=debug msg="wasm stdout: request[1] Stdout\n"`,
115-
`level=debug msg="wasm stderr: request[1] Stderr\n"`,
116120
} {
117121
require.Contains(t, log.String(), s)
118122
}
123+
// Allow duplicates for main/request[0] stdout/stderr across modules.
124+
require.Contains(t, log.String(), `level=debug msg="wasm stdout:`)
125+
require.Contains(t, log.String(), "main Stdout")
126+
require.Contains(t, log.String(), "request[0] Stdout")
127+
require.Contains(t, log.String(), `level=debug msg="wasm stderr:`)
128+
require.Contains(t, log.String(), "main Stderr")
129+
require.Contains(t, log.String(), "request[0] Stderr")
130+
// And ensure request[1] appears in stdout/stderr logs too.
131+
require.Contains(t, log.String(), "request[1] Stdout")
132+
require.Contains(t, log.String(), "request[1] Stderr")
119133
},
120134
},
121135
{

tests/certification/go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ require (
3333
github.com/lestrrat-go/jwx/v2 v2.0.21
3434
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
3535
github.com/rabbitmq/amqp091-go v1.9.0
36-
github.com/riferrei/srclient v0.7.2
36+
github.com/riferrei/srclient v0.7.3
3737
github.com/stretchr/testify v1.10.0
3838
github.com/tylertreat/comcast v1.0.1
3939
go.mongodb.org/mongo-driver v1.14.0
@@ -228,7 +228,7 @@ require (
228228
github.com/lestrrat-go/httprc v1.0.5 // indirect
229229
github.com/lestrrat-go/iter v1.0.2 // indirect
230230
github.com/lestrrat-go/option v1.0.1 // indirect
231-
github.com/linkedin/goavro/v2 v2.14.0 // indirect
231+
github.com/linkedin/goavro/v2 v2.14.1 // indirect
232232
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
233233
github.com/magiconair/properties v1.8.7 // indirect
234234
github.com/mailru/easyjson v0.7.7 // indirect

tests/certification/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,8 +1010,8 @@ github.com/lestrrat/go-strftime v0.0.0-20180220042222-ba3bf9c1d042/go.mod h1:TPp
10101010
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
10111011
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
10121012
github.com/linkedin/goavro/v2 v2.13.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
1013-
github.com/linkedin/goavro/v2 v2.14.0 h1:aNO/js65U+Mwq4yB5f1h01c3wiM458qtRad1DN0CMUI=
1014-
github.com/linkedin/goavro/v2 v2.14.0/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
1013+
github.com/linkedin/goavro/v2 v2.14.1 h1:/8VjDpd38PRsy02JS0jflAu7JZPfJcGTwqWgMkFS2iI=
1014+
github.com/linkedin/goavro/v2 v2.14.1/go.mod h1:KXx+erlq+RPlGSPmLF7xGo6SAbh8sCQ53x064+ioxhk=
10151015
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
10161016
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
10171017
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
@@ -1278,8 +1278,8 @@ github.com/redis/go-redis/v9 v9.6.3/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJ
12781278
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
12791279
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
12801280
github.com/rhnvrm/simples3 v0.6.1/go.mod h1:Y+3vYm2V7Y4VijFoJHHTrja6OgPrJ2cBti8dPGkC3sA=
1281-
github.com/riferrei/srclient v0.7.2 h1:Gc1juajxHs9L1LYy+W6Iy7RDVBZkgCdKl/dxb3/c2xE=
1282-
github.com/riferrei/srclient v0.7.2/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc=
1281+
github.com/riferrei/srclient v0.7.3 h1:JRR6jgfINWUcYZhBRHEg/NAFv7giVmjkoouRbWbakgw=
1282+
github.com/riferrei/srclient v0.7.3/go.mod h1:byIzLF4UNZzclmzQXXr++Oe1GEH/hNFahUOSTXc7uSc=
12831283
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
12841284
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
12851285
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=

0 commit comments

Comments
 (0)