Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkgs/http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* **Breaking** Change the behavior of `Request.body` so that a charset
parameter is only added for text and XML media types. This brings the
behavior of `package:http` in line with RFC-8259.
* On the web, fix cancellations for `StreamSubscription`s of response bodies
waiting for the next chunk.

## 1.5.0

Expand Down
120 changes: 85 additions & 35 deletions pkgs/http/lib/src/browser_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import 'package:web/web.dart'
DOMException,
HeadersInit,
ReadableStreamDefaultReader,
ReadableStreamReadResult,
RequestInfo,
RequestInit,
Response;
Expand Down Expand Up @@ -116,7 +117,7 @@ class BrowserClient extends BaseClient {
}.toJS);

return StreamedResponseV2(
_readBody(request, response),
_bodyToStream(request, response),
response.status,
headers: headers,
request: request,
Expand Down Expand Up @@ -144,9 +145,9 @@ class BrowserClient extends BaseClient {
}
}

Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
if (e case DOMException(:final name) when name == 'AbortError') {
Error.throwWithStackTrace(RequestAbortedException(request.url), st);
Object _toClientException(Object e, BaseRequest request) {
if (e case DOMException(name: 'AbortError')) {
return RequestAbortedException(request.url);
}
if (e is! ClientException) {
var message = e.toString();
Expand All @@ -155,49 +156,98 @@ Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
}
e = ClientException(message, request.url);
}
Error.throwWithStackTrace(e, st);
return e;
}

Never _rethrowAsClientException(Object e, StackTrace st, BaseRequest request) {
Error.throwWithStackTrace(_toClientException(e, request), st);
}

Stream<List<int>> _readBody(BaseRequest request, Response response) async* {
final bodyStreamReader =
response.body?.getReader() as ReadableStreamDefaultReader?;
Stream<List<int>> _bodyToStream(BaseRequest request, Response response) =>
Stream.multi(
isBroadcast: false,
(listener) => _readStreamBody(request, response, listener),
);

if (bodyStreamReader == null) {
Future<void> _readStreamBody(BaseRequest request, Response response,
MultiStreamController<List<int>> controller) async {
final reader = response.body?.getReader() as ReadableStreamDefaultReader?;
if (reader == null) {
// No response? Treat that as an empty stream.
await controller.close();
return;
}

var isDone = false, isError = false;
try {
while (true) {
final chunk = await bodyStreamReader.read().toDart;
if (chunk.done) {
isDone = true;
break;
Completer<void>? resumeSignal;
var cancelled = false;
var hadError = false;
controller
..onResume = () {
if (resumeSignal case final resume?) {
resumeSignal = null;
resume.complete();
}
yield (chunk.value! as JSUint8Array).toDart;
}
} catch (e, st) {
isError = true;
_rethrowAsClientException(e, st, request);
} finally {
if (!isDone) {
..onCancel = () async {
try {
// catchError here is a temporary workaround for
// http://dartbug.com/57046: an exception from cancel() will
// clobber an exception which is currently in flight.
await bodyStreamReader
.cancel()
.toDart
.catchError((_) => null, test: (_) => isError);
} catch (e, st) {
// If we have already encountered an error swallow the
// error from cancel and simply let the original error to be
// rethrown.
if (!isError) {
_rethrowAsClientException(e, st, request);
cancelled = true;
// We only cancel the reader when the subscription is cancelled - we
// don't need to do that for normal done events because the stream is in
// a completed state at that point.
await reader.cancel().toDart;
} catch (e, s) {
if (!hadError) {
// If the stream enters an error state, .cancel() will rethrow that
// error. We want to ignore that since we would have added it to the
// stream already.
_rethrowAsClientException(e, s, request);
}
}
};

// Async loop reading chunks from `bodyStreamReader` and sending them to
// `controller`.
// Checks for pause/cancel after delivering each event.
// Exits if stream closes or becomes an error, or if cancelled.
while (true) {
final ReadableStreamReadResult chunk;
try {
chunk = await reader.read().toDart;
} catch (e, s) {
// After a stream was cancelled, adding error events would result in
// unhandled async errors. This is most likely an AbortError anyway, so
// not really an exceptional state. We report errors of .cancel() in
// onCancel, that should cover this case.
if (!cancelled) {
hadError = true;
controller.addErrorSync(_toClientException(e, request), s);
await controller.close();
}

break;
}

if (chunk.done) {
// Sync because we're forwarding an async event.
controller.closeSync();
break;
} else {
// Handle chunk whether paused, cancelled or not.
// If subscription is cancelled, it's a no-op to add events.
// If subscription is paused, events will be buffered until resumed,
// which is what we need.
// We can use addSync here because we're only forwarding this async
// event.
controller.addSync((chunk.value! as JSUint8Array).toDart);
}

// Check pause/cancel state immediately *after* delivering event,
// listener might have paused or cancelled.
if (controller.isPaused) {
// Will never complete if cancelled before resumed.
await (resumeSignal ??= Completer<void>()).future;
}
if (!controller.hasListener) break; // Is cancelled.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@ void hybridMain(StreamChannel<Object?> channel) async {
late HttpServer server;
server = (await HttpServer.bind('localhost', 0))
..listen((request) async {
var path = request.uri.pathSegments;
// Slow down lines if requested, e.g. GET /1000 would send a line every
// second. This is used to test cancellations.
var delayBetweenLines = switch (path) {
[var delayMs] => Duration(milliseconds: int.parse(delayMs)),
_ => Duration.zero,
};

await request.drain<void>();
request.response.headers.set('Access-Control-Allow-Origin', '*');
request.response.headers.set('Content-Type', 'text/plain');
if (delayBetweenLines > Duration.zero) {
request.response.bufferOutput = false;
}
serverWriting = true;
for (var i = 0; serverWriting; ++i) {
request.response.write('$i\n');
await request.response.flush();
// Let the event loop run.
await Future(() {});
await Future<void>.delayed(delayBetweenLines);
}
await request.response.close();
unawaited(server.close());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,63 @@ void testResponseBodyStreamed(Client client,
});
await cancelled.future;
});

test('cancelling stream subscription after chunk', () async {
// Request a 10s delay between subsequent lines.
const delayMillis = 10000;
final request = Request('GET', Uri.http(host, '$delayMillis'));
final response = await client.send(request);
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

final cancelled = Completer<void>();
var stopwatch = Stopwatch();
final subscription = response.stream
.transform(const Utf8Decoder())
.transform(const LineSplitter())
.listen(null);
subscription.onData((line) {
stopwatch.start();
cancelled.complete(subscription.cancel());
expect(line, '0');
});

await cancelled.future;
stopwatch.stop();

// Receiving the first line and cancelling the stream should not wait for
// the second line, which is sent much later.
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
});

test('cancelling stream subscription after chunk with delay', () async {
// Request a 10s delay between subsequent lines.
const delayMillis = 10000;
final request = Request('GET', Uri.http(host, '$delayMillis'));
final response = await client.send(request);
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);

var stopwatch = Stopwatch()..start();
final done = Completer<void>();
late StreamSubscription<String> sub;
sub = response.stream
.transform(utf8.decoder)
.transform(const LineSplitter())
.listen((line) {
// Don't cancel in direct response to event, we want to test cancelling
// while the client is actively waiting for data.
Timer.run(() {
stopwatch.start();
done.complete(sub.cancel());
});
});

await done.future;
stopwatch.stop();
// Receiving the first line and cancelling the stream should not wait for
// the second line, which is sent much later.
expect(stopwatch.elapsed.inMilliseconds, lessThan(delayMillis));
});
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
}
Loading