Skip to content

Commit 1641342

Browse files
authored
feat(streams/unstable): add AbortStream (#6708)
1 parent 79c2bd5 commit 1641342

File tree

3 files changed

+80
-0
lines changed

3 files changed

+80
-0
lines changed

streams/deno.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"version": "1.0.9",
44
"exports": {
55
".": "./mod.ts",
6+
"./unstable-abort-stream": "./unstable_abort_stream.ts",
67
"./buffer": "./buffer.ts",
78
"./byte-slice-stream": "./byte_slice_stream.ts",
89
"./concat-readable-streams": "./concat_readable_streams.ts",

streams/unstable_abort_stream.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2018-2025 the Deno authors. MIT license.
2+
// This module is browser compatible.
3+
4+
/**
5+
* A transform stream that accepts a {@linkcode AbortSignal} to easily abort a
6+
* stream pipeThrough.
7+
*
8+
* @experimental **UNSTABLE**: New API, yet to be vetted.
9+
*
10+
* @typeparam T The type of the chunks passing through the AbortStream.
11+
*
12+
* @example Usage
13+
* ```ts
14+
* import { AbortStream } from "@std/streams/unstable-abort-stream";
15+
* import { assertRejects } from "@std/assert/rejects";
16+
*
17+
* const controller = new AbortController();
18+
* controller.abort(new Error("STOP"));
19+
*
20+
* await assertRejects(
21+
* async function () {
22+
* await new Response(
23+
* (await Deno.open("./deno.json"))
24+
* .readable
25+
* .pipeThrough(new AbortStream(controller.signal)),
26+
* )
27+
* .bytes();
28+
* },
29+
* Error,
30+
* "STOP",
31+
* );
32+
* ```
33+
*/
34+
export class AbortStream<T> extends TransformStream<T, T> {
35+
/**
36+
* Constructs a new instance.
37+
*
38+
* @param signal The {@linkcode AbortSignal}.
39+
*/
40+
constructor(signal: AbortSignal) {
41+
super({
42+
transform(chunk, controller) {
43+
if (signal.aborted) controller.error(signal.reason);
44+
else controller.enqueue(chunk);
45+
},
46+
});
47+
}
48+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2018-2025 the Deno authors. MIT license.
2+
3+
import { assertRejects } from "@std/assert";
4+
import { FixedChunkStream } from "@std/streams/unstable-fixed-chunk-stream";
5+
import { AbortStream } from "./unstable_abort_stream.ts";
6+
7+
Deno.test("AbortStream", async () => {
8+
const controller = new AbortController();
9+
10+
const file = "./deno.json";
11+
const chunkSize = (await Deno.stat(file)).size / 4;
12+
const promise = new Response(
13+
(await Deno.open(file))
14+
.readable
15+
.pipeThrough(new FixedChunkStream(chunkSize)) // To make sure we aren't only given 1 chunk of the entire contents.
16+
.pipeThrough( // To slow the process down for the controller.abort()
17+
new TransformStream({
18+
async transform(chunk, controller) {
19+
await new Promise((a) => setTimeout(a, 50));
20+
controller.enqueue(chunk);
21+
},
22+
}),
23+
)
24+
.pipeThrough(new AbortStream(controller.signal)),
25+
)
26+
.bytes();
27+
28+
setTimeout(() => controller.abort(), 75);
29+
30+
await assertRejects(() => promise);
31+
});

0 commit comments

Comments
 (0)