Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkgs/http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* **Breaking** Change the behavior of `Request.body` so that a charset
parameter is only added for text and XML media types. This brings the
behavior of `package:http` in line with RFC-8259.
* On the web, fix cancellations for `StreamSubscription`s of response bodies
waiting for the next chunk.
* Export `MediaType` from `package:http_parser`.
* Added a section on testing to `README.md`.

Expand Down
125 changes: 90 additions & 35 deletions pkgs/http/lib/src/browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'package:web/web.dart'
DOMException,
HeadersInit,
ReadableStreamDefaultReader,
ReadableStreamReadResult,
RequestInfo,
RequestInit,
Response;
Expand Down Expand Up @@ -116,7 +117,7 @@ class BrowserClient extends BaseClient {
}.toJS);

return StreamedResponseV2(
_readBody(request, response),
_bodyToStream(request, response),
response.status,
headers: headers,
request: request,
Expand Down Expand Up @@ -144,9 +145,9 @@ class BrowserClient extends BaseClient {
}
}

Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
if (e case DOMException(:final name) when name == 'AbortError') {
Error.throwWithStackTrace(RequestAbortedException(request.url), st);
Object _toClientException(Object e, BaseRequest request) {
if (e case DOMException(name: 'AbortError')) {
return RequestAbortedException(request.url);
}
if (e is! ClientException) {
var message = e.toString();
Expand All @@ -155,49 +156,103 @@ Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
}
e = ClientException(message, request.url);
}
Error.throwWithStackTrace(e, st);
return e;
}

Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
Error.throwWithStackTrace(_toClientException(e, request), st);
}

Stream<List<int>> _readBody(BaseRequest request, Response response) async* {
final bodyStreamReader =
response.body?.getReader() as ReadableStreamDefaultReader?;
Stream<List<int>> _bodyToStream(BaseRequest request, Response response) =>
Stream.multi(
isBroadcast: false,
(listener) => _readStreamBody(request, response, listener),
);

if (bodyStreamReader == null) {
Future<void> _readStreamBody(BaseRequest request, Response response,
MultiStreamController<List<int>> controller) async {
final reader = response.body?.getReader() as ReadableStreamDefaultReader?;
if (reader == null) {
// No response? Treat that as an empty stream.
await controller.close();
return;
}

var isDone = false, isError = false;
try {
while (true) {
final chunk = await bodyStreamReader.read().toDart;
if (chunk.done) {
isDone = true;
break;
Completer<void>? resumeSignal;
var cancelled = false;
var hadError = false;
controller
..onResume = () {
if (resumeSignal case final resume?) {
resumeSignal = null;
resume.complete();
}
yield (chunk.value! as JSUint8Array).toDart;
}
} catch (e, st) {
isError = true;
_rethrowAsClientException(e, st, request);
} finally {
if (!isDone) {
..onCancel = () async {
try {
// catchError here is a temporary workaround for
// http://dartbug.com/57046: an exception from cancel() will
// clobber an exception which is currently in flight.
await bodyStreamReader
.cancel()
.toDart
.catchError((_) => null, test: (_) => isError);
} catch (e, st) {
// If we have already encountered an error swallow the
// error from cancel and simply let the original error to be
// rethrown.
if (!isError) {
_rethrowAsClientException(e, st, request);
cancelled = true;
// We only cancel the reader when the subscription is cancelled - we
// don't need to do that for normal done events because the stream is in
// a completed state at that point.
await reader.cancel().toDart;
} catch (e, s) {
// It is possible for reader.cancel() to throw. This happens either
// because the stream has already been in an error state (in which case
// we would have called addErrorSync() before and don't need to re-
// report the error here), or because of an issue here (MDN says the
// method can throw if "The source object is not a
// ReadableStreamDefaultReader, or the stream has no owner."). Both of
// these don't look applicable here, but we want to ensure a new error
// in cancel() is surfaced to the caller.
if (!hadError) {
_rethrowAsClientException(e, s, request);
}
}
};

// Async loop reading chunks from `bodyStreamReader` and sending them to
// `controller`.
// Checks for pause/cancel after delivering each event.
// Exits if stream closes or becomes an error, or if cancelled.
while (true) {
final ReadableStreamReadResult chunk;
try {
chunk = await reader.read().toDart;
} catch (e, s) {
// After a stream was cancelled, adding error events would result in
// unhandled async errors. This is most likely an AbortError anyway, so
// not really an exceptional state. We report errors of .cancel() in
// onCancel, that should cover this case.
if (!cancelled) {
hadError = true;
controller.addErrorSync(_toClientException(e, request), s);
await controller.close();
}

break;
}

if (chunk.done) {
// Sync because we're forwarding an async event.
controller.closeSync();
break;
} else {
// Handle chunk whether paused, cancelled or not.
// If subscription is cancelled, it's a no-op to add events.
// If subscription is paused, events will be buffered until resumed,
// which is what we need.
// We can use addSync here because we're only forwarding this async
// event.
controller.addSync((chunk.value! as JSUint8Array).toDart);
}

// Check pause/cancel state immediately *after* delivering event,
// listener might have paused or cancelled.
if (controller.isPaused) {
// Will never complete if cancelled before resumed.
await (resumeSignal ??= Completer<void>()).future;
}
if (!controller.hasListener) break; // Is cancelled.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,25 @@ void hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;
server = (await HttpServer.bind('localhost', 0))
..listen((request) async {
var path = request.uri.pathSegments;
// Slow down lines if requested, e.g. GET /1000 would send a line every
// second. This is used to test cancellations.
var delayBetweenLines = switch (path) {
[var delayMs] => Duration(milliseconds: int.parse(delayMs)),
_ => Duration.zero,
};

await request.drain<void>();
request.response.headers.set('Access-Control-Allow-Origin', '*');
request.response.headers.set('Content-Type', 'text/plain');
request.response.bufferOutput = false;

serverWriting = true;
for (var i = 0; serverWriting; ++i) {
request.response.write('$i\n');
await request.response.flush();
// Let the event loop run.
await Future(() {});
await Future<void>.delayed(delayBetweenLines);
}
await request.response.close();
unawaited(server.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,118 @@ void testResponseBodyStreamed(Client client,
expect(response.statusCode, 200);
});

test('pausing response stream after events', () async {
final response = await client.send(Request('GET', Uri.http(host, '')));
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

// The server responds with a streamed response of lines containing
// incrementing integers. Verify that pausing the stream after each one
// does not cause any missed lines.
final stream = response.stream
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.map(int.parse);
var expectedLine = 0;
final cancelCompleter = Completer<void>();
late StreamSubscription<int> subscription;

subscription = stream.listen((line) async {
expect(line, expectedLine);
expectedLine++;

if (expectedLine == 10) {
subscription.pause();
Future.delayed(
const Duration(seconds: 1), () => subscription.resume());
}

if (expectedLine == 100) {
cancelCompleter.complete(subscription.cancel());
}
await pumpEventQueue();
});

await cancelCompleter.future;
expect(expectedLine, 100);
});

test('pausing response stream asynchronously', () async {
final response = await client.send(Request('GET', Uri.http(host, '')));
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

final originalSubscription = response.stream
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.map(int.parse)
.listen(null);
var expectedLine = 0;
await for (final line in SubscriptionStream(originalSubscription)) {
expect(line, expectedLine);
expectedLine++;
if (expectedLine == 100) {
break;
}

// Instead of pausing the subscription in response to an event, pause it
// after the event has already been delivered.
Timer.run(() {
originalSubscription.pause(Future(pumpEventQueue));
});
}
});

test('cancel paused stream', () async {
final response = await client.send(Request('GET', Uri.http(host, '')));
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

final completer = Completer<void>();
late StreamSubscription<String> subscription;
subscription = response.stream
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen((line) async {
subscription.pause();

completer.complete(Future(() async {
await pumpEventQueue();
await subscription.cancel();
}));
});

await completer.future;
});

test('cancel paused stream via abortable request', () async {
final abortTrigger = Completer<void>();
final response = await client.send(AbortableRequest(
'GET', Uri.http(host, ''),
abortTrigger: abortTrigger.future));
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

late StreamSubscription<String> subscription;
subscription = response.stream
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen((line) {
if (!abortTrigger.isCompleted) {
abortTrigger.complete();
}
});

final aborted = expectLater(subscription.asFuture<void>(),
throwsA(isA<RequestAbortedException>()));
await abortTrigger.future;

// We need to resume the subscription after the response has been
// cancelled to record that error event.
subscription.resume();
await aborted;
});

test('cancel streamed response', () async {
final request = Request('GET', Uri.http(host, ''));
final response = await client.send(request);
Expand All @@ -77,5 +189,63 @@ void testResponseBodyStreamed(Client client,
});
await cancelled.future;
});

test('cancelling stream subscription after chunk', () async {
// Request a 10s delay between subsequent lines.
const delayMillis = 10000;
final request = Request('GET', Uri.http(host, '$delayMillis'));
final response = await client.send(request);
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

final cancelled = Completer<void>();
var stopwatch = Stopwatch();
final subscription = response.stream
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen(null);
subscription.onData((line) {
stopwatch.start();
cancelled.complete(subscription.cancel());
expect(line, '0');
});

await cancelled.future;
stopwatch.stop();

// Receiving the first line and cancelling the stream should not wait for
// the second line, which is sent much later.
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
});

test('cancelling stream subscription after chunk with delay', () async {
// Request a 10s delay between subsequent lines.
const delayMillis = 10000;
final request = Request('GET', Uri.http(host, '$delayMillis'));
final response = await client.send(request);
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

var stopwatch = Stopwatch()..start();
final done = Completer<void>();
late StreamSubscription<String> sub;
sub = response.stream
.transform(utf8.decoder)
.transform(const LineSplitter())
.listen((line) {
// Don't cancel in direct response to event, we want to test cancelling
// while the client is actively waiting for data.
Timer.run(() {
stopwatch.start();
done.complete(sub.cancel());
});
});

await done.future;
stopwatch.stop();
// Receiving the first line and cancelling the stream should not wait for
// the second line, which is sent much later.
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
});
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
}
Loading