Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkgs/cronet_http/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.3.4-wip

* Cancel requests when the response stream is cancelled.

## 1.3.3

* Throw `ClientException` if `CronetClient.send` runs out of Java heap while
Expand Down
49 changes: 49 additions & 0 deletions pkgs/cronet_http/example/integration_test/client_profile_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,55 @@ void main() {
});
});

group('cancel streaming GET response', () {
late HttpServer successServer;
late Uri successServerUri;
late HttpClientRequestProfile profile;
late List<int> receivedData;

setUpAll(() async {
successServer = (await HttpServer.bind('localhost', 0))
..listen((request) async {
await request.drain<void>();
request.response.headers.set('Content-Type', 'text/plain');
while (true) {
request.response.write('Hello World');
await request.response.flush();
await Future<void>.delayed(const Duration(seconds: 0));
}
});
final cancelCompleter = Completer<void>();
successServerUri = Uri.http('localhost:${successServer.port}');
final client = CronetClientWithProfile.defaultCronetEngine();
final request = StreamedRequest('GET', successServerUri);
unawaited(request.sink.close());
final response = await client.send(request);

var i = 0;
late final StreamSubscription<List<int>> s;
receivedData = [];
s = response.stream.listen((d) {
receivedData += d;
if (++i == 1000) {
s.cancel();
cancelCompleter.complete();
}
});
await cancelCompleter.future;
profile = client.profile!;
});
tearDownAll(() {
successServer.close();
});

test('request attributes', () async {
expect(profile.requestData.contentLength, isNull);
expect(profile.requestData.startTime, isNotNull);
expect(profile.requestData.endTime, isNotNull);
expect(profile.responseData.bodyBytes, receivedData);
});
});

group('redirects', () {
late HttpServer successServer;
late Uri successServerUri;
Expand Down
19 changes: 18 additions & 1 deletion pkgs/cronet_http/lib/src/cronet_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,24 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
StreamController<List<int>>? responseStream;
JByteBuffer? jByteBuffer;
var numRedirects = 0;
var done = false;

// The order of callbacks generated by Cronet is documented here:
// https://developer.android.com/guide/topics/connectivity/cronet/lifecycle
return jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement(
jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterface(
onResponseStarted: (urlRequest, responseInfo) {
responseStream = StreamController();
///
responseStream = StreamController(onCancel: () {
// The user did `response.stream.cancel()`. We can just pretend that
// the response completed normally.
if (done) return;
done = true;
urlRequest.cancel();
responseStream!.sink.close();
jByteBuffer?.release();
profile?.responseData.close();
});
final responseHeaders =
_cronetToClientHeaders(responseInfo.getAllHeaders());
int? contentLength;
Expand Down Expand Up @@ -203,6 +214,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
urlRequest.read(jByteBuffer!);
},
onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) {
if (done) return;
final responseHeaders =
_cronetToClientHeaders(responseInfo.getAllHeaders());

Expand Down Expand Up @@ -247,6 +259,7 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
}
},
onReadCompleted: (urlRequest, responseInfo, byteBuffer) {
if (done) return;
byteBuffer.flip();
final data = jByteBuffer!.asUint8List().sublist(0, byteBuffer.remaining);
responseStream!.add(data);
Expand All @@ -256,11 +269,15 @@ jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
urlRequest.read(byteBuffer);
},
onSucceeded: (urlRequest, responseInfo) {
if (done) return;
done = true;
responseStream!.sink.close();
jByteBuffer?.release();
profile?.responseData.close();
},
onFailed: (urlRequest, responseInfo, cronetException) {
if (done) return;
done = true;
final error = ClientException(
'Cronet exception: ${cronetException.toString()}', request.url);
if (responseStream == null) {
Expand Down
2 changes: 1 addition & 1 deletion pkgs/cronet_http/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: cronet_http
version: 1.3.3
version: 1.3.4-wip
description: >-
An Android Flutter plugin that provides access to the Cronet HTTP client.
repository: https://github.com/dart-lang/http/tree/master/pkgs/cronet_http
Expand Down
4 changes: 4 additions & 0 deletions pkgs/cupertino_http/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 2.1.2-wip

* Cancel requests when the response stream is cancelled.

## 2.1.1

* Support `package:web_socket` 1.0.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,59 @@ void main() {
});
});

group('cancel streaming GET response', () {
late HttpServer successServer;
late Uri successServerUri;
late HttpClientRequestProfile profile;
late List<int> receivedData;

setUpAll(() async {
successServer = (await HttpServer.bind('localhost', 0))
..listen((request) async {
await request.drain<void>();
request.response.headers.set('Content-Type', 'text/plain');
while (true) {
request.response.write('Hello World');
await request.response.flush();
await Future<void>.delayed(const Duration(seconds: 0));
}
});
final cancelCompleter = Completer<void>();
successServerUri = Uri.http('localhost:${successServer.port}');
final client = CupertinoClientWithProfile.defaultSessionConfiguration();
final request = StreamedRequest('GET', successServerUri);
unawaited(request.sink.close());
final response = await client.send(request);

var i = 0;
late final StreamSubscription<List<int>> s;
receivedData = [];
s = response.stream.listen((d) {
receivedData += d;
if (++i == 1000) {
s.cancel();
cancelCompleter.complete();
}
});
await cancelCompleter.future;
profile = client.profile!;
});
tearDownAll(() {
successServer.close();
});

test('request attributes', () async {
expect(profile.requestData.contentLength, isNull);
expect(profile.requestData.startTime, isNotNull);
expect(profile.requestData.endTime, isNotNull);
// Extra data could be received before the cancel event is dispatched
// by the url loading framework so check that
// `profile.responseData.bodyBytes` starts with `receivedData`.
expect(profile.responseData.bodyBytes.sublist(0, receivedData.length),
receivedData);
});
});

group('redirects', () {
late HttpServer successServer;
late Uri successServerUri;
Expand Down
19 changes: 15 additions & 4 deletions pkgs/cupertino_http/lib/src/cupertino_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import 'cupertino_api.dart';

final _digitRegex = RegExp(r'^\d+$');

const _nsurlErrorCancelled = -999;

/// This class can be removed when `package:http` v2 is released.
class _StreamedResponseWithUrl extends StreamedResponse
implements BaseResponseWithUrl {
Expand All @@ -33,12 +35,12 @@ class _StreamedResponseWithUrl extends StreamedResponse
class _TaskTracker {
final responseCompleter = Completer<URLResponse>();
final BaseRequest request;
final responseController = StreamController<Uint8List>();
final StreamController<Uint8List> responseController;
final HttpClientRequestProfile? profile;
int numRedirects = 0;
Uri? lastUrl; // The last URL redirected to.

_TaskTracker(this.request, this.profile);
_TaskTracker(this.request, this.responseController, this.profile);

void close() {
responseController.close();
Expand Down Expand Up @@ -167,7 +169,13 @@ class CupertinoClient extends BaseClient {
static void _onComplete(
URLSession session, URLSessionTask task, NSError? error) {
final taskTracker = _tracker(task);
if (error != null) {
// The task will only be cancelled if the user calls
// `StreamedResponse.stream.cancel()`, which can only happen if the response
// has already been received. Therefore, it is safe to handle task
// cancellation errors as if the response completed normally.
if (error != null &&
!(error.domain.toDartString() == 'NSURLErrorDomain' &&
error.code == _nsurlErrorCancelled)) {
final exception = ClientException(
error.localizedDescription.toDartString(), taskTracker.request.url);
if (taskTracker.profile != null &&
Expand Down Expand Up @@ -338,7 +346,10 @@ class CupertinoClient extends BaseClient {
// This will preserve Apple default headers - is that what we want?
request.headers.forEach(urlRequest.setValueForHttpHeaderField);
final task = urlSession.dataTaskWithRequest(urlRequest);
final taskTracker = _TaskTracker(request, profile);
final subscription = StreamController<Uint8List>(onCancel: () {
task.cancel();
});
final taskTracker = _TaskTracker(request, subscription, profile);
_tasks[task] = taskTracker;
task.resume();

Expand Down
2 changes: 1 addition & 1 deletion pkgs/cupertino_http/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: cupertino_http
version: 2.1.1
version: 2.1.2-wip
description: >-
A macOS/iOS Flutter plugin that provides access to the Foundation URL
Loading System.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';

import 'package:async/async.dart';
Expand All @@ -21,16 +22,16 @@ import 'response_body_streamed_server_vm.dart'
void testResponseBodyStreamed(Client client,
{bool canStreamResponseBody = true}) async {
group('streamed response body', () {
late final String host;
late final StreamChannel<Object?> httpServerChannel;
late final StreamQueue<Object?> httpServerQueue;
late String host;
late StreamChannel<Object?> httpServerChannel;
late StreamQueue<Object?> httpServerQueue;

setUpAll(() async {
setUp(() async {
httpServerChannel = await startServer();
httpServerQueue = StreamQueue(httpServerChannel.stream);
host = 'localhost:${await httpServerQueue.nextAsInt}';
});
tearDownAll(() => httpServerChannel.sink.add(null));
tearDown(() => httpServerChannel.sink.add(null));

test('large response streamed without content length', () async {
// The server continuously streams data to the client until
Expand All @@ -56,6 +57,25 @@ void testResponseBodyStreamed(Client client,
expect(response.reasonPhrase, 'OK');
expect(response.request!.method, 'GET');
expect(response.statusCode, 200);
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
});
});

test('cancel streamed response', () async {
final request = Request('GET', Uri.http(host, ''));
final response = await client.send(request);
final cancelled = Completer<void>();
expect(response.reasonPhrase, 'OK');
expect(response.statusCode, 200);
late StreamSubscription<String> subscription;
subscription = const LineSplitter()
.bind(const Utf8Decoder().bind(response.stream))
.listen((s) async {
final lastReceived = int.parse(s.trim());
if (lastReceived == 1000) {
unawaited(subscription.cancel());
cancelled.complete();
}
});
await cancelled.future;
});
}, skip: canStreamResponseBody ? false : 'does not stream response bodies');
}
Loading