Skip to content

Commit 29916aa

Browse files
committed
fix(core/event-streams): event stream payload handling
1 parent 2cadea1 commit 29916aa

File tree

2 files changed

+178
-28
lines changed

2 files changed

+178
-28
lines changed

packages/core/src/submodules/event-streams/EventStreamSerde.spec.ts

Lines changed: 129 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import { EventStreamMarshaller } from "@smithy/eventstream-serde-node";
44
import { HttpResponse } from "@smithy/protocol-http";
55
import type {
66
BlobSchema,
7+
BooleanSchema,
78
Message as EventMessage,
9+
NumericSchema,
810
StaticSimpleSchema,
911
StaticStructureSchema,
10-
StreamingBlobSchema,
1112
StringSchema,
1213
TimestampEpochSecondsSchema,
1314
} from "@smithy/types";
15+
import { Uint8ArrayBlobAdapter } from "@smithy/util-stream";
1416
import { fromUtf8, toUtf8 } from "@smithy/util-utf8";
1517
import { describe, expect, test as it } from "vitest";
1618

@@ -63,15 +65,7 @@ describe(EventStreamSerde.name, () => {
6365
"Payload",
6466
0,
6567
["payload"],
66-
[
67-
[
68-
0,
69-
"ns",
70-
"StreamingBlobPayload",
71-
{ eventPayload: 1 },
72-
42 satisfies StreamingBlobSchema,
73-
] satisfies StaticSimpleSchema,
74-
],
68+
[[0, "ns", "BlobPayload", { eventPayload: 1 }, 21 satisfies BlobSchema] satisfies StaticSimpleSchema],
7569
],
7670
[
7771
3,
@@ -86,10 +80,21 @@ describe(EventStreamSerde.name, () => {
8680
"ns",
8781
"CustomHeaders",
8882
0,
89-
["header1", "header2"],
83+
["header1", "header2", "header-date", "header-number", "header-boolean", "header-blob"],
9084
[
9185
[0, "ns", "EventHeader", { eventHeader: 1 }, 0 satisfies StringSchema] satisfies StaticSimpleSchema,
9286
[0, "ns", "EventHeader", { eventHeader: 1 }, 0 satisfies StringSchema] satisfies StaticSimpleSchema,
87+
88+
[
89+
0,
90+
"ns",
91+
"EventHeader",
92+
{ eventHeader: 1 },
93+
7 satisfies TimestampEpochSecondsSchema,
94+
] satisfies StaticSimpleSchema,
95+
[0, "ns", "EventHeader", { eventHeader: 1 }, 1 satisfies NumericSchema] satisfies StaticSimpleSchema,
96+
[0, "ns", "EventHeader", { eventHeader: 1 }, 2 satisfies BooleanSchema] satisfies StaticSimpleSchema,
97+
[0, "ns", "EventHeader", { eventHeader: 1 }, 21 satisfies BlobSchema] satisfies StaticSimpleSchema,
9398
],
9499
],
95100
],
@@ -119,7 +124,16 @@ describe(EventStreamSerde.name, () => {
119124
yield { $unknown: ["D", { name: "d" }] };
120125
yield { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } };
121126
yield { TextPayload: { payload: "beep boop" } };
122-
yield { CustomHeaders: { header1: "h1", header2: "h2" } };
127+
yield {
128+
CustomHeaders: {
129+
header1: "h1",
130+
header2: "h2",
131+
"header-date": new Date(0),
132+
"header-number": -2,
133+
"header-boolean": false,
134+
"header-blob": new Uint8Array([0, 1, 2, 3]),
135+
},
136+
};
123137
},
124138
};
125139

@@ -179,6 +193,22 @@ describe(EventStreamSerde.name, () => {
179193
":content-type": { type: "string", value: "application/cbor" },
180194
header1: { type: "string", value: "h1" },
181195
header2: { type: "string", value: "h2" },
196+
"header-boolean": {
197+
type: "boolean",
198+
value: false,
199+
},
200+
"header-date": {
201+
type: "timestamp",
202+
value: new Date(0),
203+
},
204+
"header-number": {
205+
type: "integer",
206+
value: -2,
207+
},
208+
"header-blob": {
209+
type: "binary",
210+
value: new Uint8Array([0, 1, 2, 3]),
211+
},
182212
},
183213
body: {},
184214
},
@@ -248,13 +278,60 @@ describe(EventStreamSerde.name, () => {
248278
*/
249279
function messageSerializer(event: any): EventMessage {
250280
const eventType = Object.keys(event)[0];
281+
const data = event[eventType];
282+
283+
const headerKeys = Object.keys(data).filter((k) => k.startsWith("header"));
284+
const headers = {
285+
":message-type": { type: "string", value: "event" },
286+
":event-type": { type: "string", value: eventType },
287+
":content-type": { type: "string", value: "application/cbor" },
288+
} as any;
289+
290+
for (const key of headerKeys) {
291+
const v = data[key];
292+
if (v instanceof Date) {
293+
headers[key] = {
294+
type: "timestamp",
295+
value: data[key],
296+
};
297+
} else if (typeof v === "boolean") {
298+
headers[key] = {
299+
type: "boolean",
300+
value: data[key],
301+
};
302+
} else if (typeof v === "string") {
303+
headers[key] = {
304+
type: "string",
305+
value: data[key],
306+
};
307+
} else if (typeof v === "number") {
308+
headers[key] = {
309+
type: "integer",
310+
value: v,
311+
};
312+
} else if (v instanceof Uint8Array) {
313+
headers[key] = {
314+
type: "binary",
315+
value: v,
316+
};
317+
} else {
318+
throw new Error("unhandled type");
319+
}
320+
321+
delete data[key];
322+
}
323+
324+
const payload = data.payload;
325+
if (payload) {
326+
return {
327+
headers,
328+
body: typeof payload === "string" ? fromUtf8(payload) : payload,
329+
};
330+
}
331+
251332
return {
252-
headers: {
253-
":message-type": { type: "string", value: "event" },
254-
":event-type": { type: "string", value: eventType },
255-
":content-type": { type: "string", value: "application/cbor" },
256-
},
257-
body: cbor.serialize(event[eventType]),
333+
headers,
334+
body: cbor.serialize(data),
258335
};
259336
}
260337

@@ -266,9 +343,18 @@ describe(EventStreamSerde.name, () => {
266343
yield { B: { name: "b" } };
267344
yield { C: { name: "c" } };
268345
yield { D: { name: "d" } };
269-
yield { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } };
346+
yield { Payload: { payload: new Uint8ArrayBlobAdapter([0, 1, 2, 3, 4, 5, 6]) } };
270347
yield { TextPayload: { payload: "boop beep" } };
271-
yield { CustomHeaders: { header1: "h1", header2: "h2" } };
348+
yield {
349+
CustomHeaders: {
350+
header1: "h1",
351+
header2: "h2",
352+
"header-date": new Date(0),
353+
"header-number": -2,
354+
"header-boolean": false,
355+
"header-blob": new Uint8Array([0, 1, 2, 3]),
356+
},
357+
};
272358
},
273359
};
274360

@@ -280,7 +366,7 @@ describe(EventStreamSerde.name, () => {
280366
":event-type": { type: "string", value: "D" },
281367
":content-type": { type: "string", value: "application/cbor" },
282368
},
283-
body: Uint8Array.from(cbor.serialize({ name: "d" })),
369+
body: Uint8ArrayBlobAdapter.from(cbor.serialize({ name: "d" })),
284370
},
285371
},
286372
};
@@ -311,9 +397,18 @@ describe(EventStreamSerde.name, () => {
311397
{ C: { name: `c` } },
312398
// todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events.
313399
// $unknownEvent,
314-
{ Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } },
400+
{ Payload: { payload: new Uint8ArrayBlobAdapter([0, 1, 2, 3, 4, 5, 6]) } },
315401
{ TextPayload: { payload: "boop beep" } },
316-
{ CustomHeaders: { header1: "h1", header2: "h2" } },
402+
{
403+
CustomHeaders: {
404+
header1: "h1",
405+
header2: "h2",
406+
"header-boolean": false,
407+
"header-date": new Date(0),
408+
"header-number": -2,
409+
"header-blob": new Uint8Array([0, 1, 2, 3]),
410+
},
411+
},
317412
]);
318413
});
319414

@@ -349,9 +444,18 @@ describe(EventStreamSerde.name, () => {
349444
{ C: { name: `c` } },
350445
// todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events.
351446
// $unknownEvent,
352-
{ Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } },
447+
{ Payload: { payload: new Uint8ArrayBlobAdapter([0, 1, 2, 3, 4, 5, 6]) } },
353448
{ TextPayload: { payload: "boop beep" } },
354-
{ CustomHeaders: { header1: "h1", header2: "h2" } },
449+
{
450+
CustomHeaders: {
451+
header1: "h1",
452+
header2: "h2",
453+
"header-boolean": false,
454+
"header-date": new Date(0),
455+
"header-number": -2,
456+
"header-blob": new Uint8Array([0, 1, 2, 3]),
457+
},
458+
},
355459
]);
356460

357461
expect(initialResponseContainer).toEqual({

packages/core/src/submodules/event-streams/EventStreamSerde.ts

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
EventStreamMarshaller,
55
HttpRequest as IHttpRequest,
66
HttpResponse as IHttpResponse,
7+
Int64,
78
Message as EventStreamMessage,
89
MessageHeaders,
910
MessageHeaderValue,
@@ -12,7 +13,8 @@ import type {
1213
ShapeSerializer,
1314
StaticStructureSchema,
1415
} from "@smithy/types";
15-
import { fromUtf8 } from "@smithy/util-utf8";
16+
import { Uint8ArrayBlobAdapter } from "@smithy/util-stream";
17+
import { fromUtf8, toUtf8 } from "@smithy/util-utf8";
1618

1719
/**
1820
* Separated module for async mixin of EventStream serde capability.
@@ -159,17 +161,61 @@ export class EventStreamSerde {
159161
return key !== "__type";
160162
}) ?? "";
161163

164+
const body = event[unionMember].body;
165+
162166
if (unionMember === "initial-response") {
163-
const dataObject = await this.deserializer.read(responseSchema, event[unionMember].body);
167+
const dataObject = await this.deserializer.read(responseSchema, body);
164168
delete dataObject[eventStreamMember];
165169
return {
166170
[initialResponseMarker]: true,
167171
...dataObject,
168172
};
169173
} else if (unionMember in memberSchemas) {
170174
const eventStreamSchema = memberSchemas[unionMember];
175+
176+
if (eventStreamSchema.isStructSchema()) {
177+
// check for event stream bindings
178+
const out = {} as any;
179+
let hasBindings = false;
180+
181+
for (const [name, member] of eventStreamSchema.structIterator()) {
182+
const { eventHeader, eventPayload } = member.getMergedTraits();
183+
hasBindings = hasBindings || Boolean(eventHeader || eventPayload);
184+
if (eventPayload) {
185+
// https://smithy.io/2.0/spec/streaming.html#eventpayload-trait
186+
// structure > :test(member > :test(blob, string, structure, union))
187+
if (member.isBlobSchema()) {
188+
out[name] = Uint8ArrayBlobAdapter.mutate(body);
189+
} else if (member.isStringSchema()) {
190+
out[name] = (this.serdeContext?.utf8Encoder ?? toUtf8)(body);
191+
} else if (member.isStructSchema()) {
192+
out[name] = await this.deserializer.read(member, body);
193+
}
194+
} else if (eventHeader) {
195+
const value = event[unionMember].headers[name]?.value;
196+
if (value != null) {
197+
if (member.isNumericSchema()) {
198+
if (value && typeof value === "object" && "bytes" in (value as Int64)) {
199+
out[name] = BigInt(value.toString());
200+
} else {
201+
out[name] = Number(value);
202+
}
203+
} else {
204+
out[name] = value;
205+
}
206+
}
207+
}
208+
}
209+
210+
if (hasBindings) {
211+
return {
212+
[unionMember]: out,
213+
};
214+
}
215+
}
216+
171217
return {
172-
[unionMember]: await this.deserializer.read(eventStreamSchema, event[unionMember].body),
218+
[unionMember]: await this.deserializer.read(eventStreamSchema, body),
173219
};
174220
} else {
175221
// todo(schema): This union convention is ignored by the event stream marshaller.

0 commit comments

Comments
 (0)