Skip to content

Commit 6a503c0

Browse files
author
Datadog Syncup Service
committed
Merge branch 'upstream-master'
2 parents 3a35f54 + 34f76e3 commit 6a503c0

23 files changed

+794
-482
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.concurrent.ConcurrentLinkedDeque;
4040
import java.util.concurrent.Executor;
4141
import java.util.concurrent.Flow;
42+
import java.util.concurrent.Flow.Subscription;
4243

4344
import jdk.internal.net.http.common.Demand;
4445
import jdk.internal.net.http.common.HttpBodySubscriberWrapper;
@@ -207,14 +208,27 @@ static final class Http1ResponseBodySubscriber<U> extends HttpBodySubscriberWrap
207208
this.exchange = exchange;
208209
}
209210

211+
@Override
212+
protected void onSubscribed() {
213+
exchange.registerResponseSubscriber(this);
214+
}
215+
210216
@Override
211217
protected void complete(Throwable t) {
212218
try {
213-
exchange.responseSubscriberCompleted(this);
219+
exchange.unregisterResponseSubscriber(this);
214220
} finally {
215221
super.complete(t);
216222
}
217223
}
224+
225+
@Override
226+
protected void onCancel() {
227+
// If the subscription is cancelled the
228+
// subscriber may or may not get completed.
229+
// Therefore we need to unregister it
230+
exchange.unregisterResponseSubscriber(this);
231+
}
218232
}
219233

220234
@Override
@@ -264,7 +278,7 @@ private void connectFlows(HttpConnection connection) {
264278
// The Http1ResponseBodySubscriber is registered with the HttpClient
265279
// to ensure that it gets completed if the SelectorManager aborts due
266280
// to unexpected exceptions.
267-
void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
281+
private void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
268282
Throwable failed = null;
269283
synchronized (lock) {
270284
failed = this.failed;
@@ -279,8 +293,8 @@ void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
279293
}
280294
}
281295

282-
void responseSubscriberCompleted(HttpBodySubscriberWrapper<T> subscriber) {
283-
client.subscriberCompleted(subscriber);
296+
private void unregisterResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
297+
client.unregisterSubscriber(subscriber);
284298
}
285299

286300
@Override
@@ -450,7 +464,6 @@ Http1ResponseBodySubscriber<T> createResponseSubscriber(BodyHandler<T> handler,
450464
BodySubscriber<T> subscriber = handler.apply(response);
451465
Http1ResponseBodySubscriber<T> bs =
452466
new Http1ResponseBodySubscriber<T>(subscriber, this);
453-
registerResponseSubscriber(bs);
454467
return bs;
455468
}
456469

src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,7 @@ static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
396396
private final AtomicLong pendingHttpRequestCount = new AtomicLong();
397397
private final AtomicLong pendingHttp2StreamCount = new AtomicLong();
398398
private final AtomicLong pendingTCPConnectionCount = new AtomicLong();
399+
private final AtomicLong pendingSubscribersCount = new AtomicLong();
399400
private final AtomicBoolean isAlive = new AtomicBoolean();
400401

401402
/** A Set of, deadline first, ordered timeout events. */
@@ -537,16 +538,26 @@ public void registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
537538
if (!selmgr.isClosed()) {
538539
synchronized (selmgr) {
539540
if (!selmgr.isClosed()) {
540-
subscribers.add(subscriber);
541+
if (subscribers.add(subscriber)) {
542+
long count = pendingSubscribersCount.incrementAndGet();
543+
if (debug.on()) {
544+
debug.log("body subscriber registered: " + count);
545+
}
546+
}
541547
return;
542548
}
543549
}
544550
}
545551
subscriber.onError(selmgr.selectorClosedException());
546552
}
547553

548-
public void subscriberCompleted(HttpBodySubscriberWrapper<?> subscriber) {
549-
subscribers.remove(subscriber);
554+
public void unregisterSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
555+
if (subscribers.remove(subscriber)) {
556+
long count = pendingSubscribersCount.decrementAndGet();
557+
if (debug.on()) {
558+
debug.log("body subscriber unregistered: " + count);
559+
}
560+
}
550561
}
551562

552563
private void closeConnection(HttpConnection conn) {
@@ -616,7 +627,7 @@ final long unreference() {
616627
final long httpCount = pendingHttpOperationsCount.decrementAndGet();
617628
final long http2Count = pendingHttp2StreamCount.get();
618629
final long webSocketCount = pendingWebSocketCount.get();
619-
if (count == 0 && facade() == null) {
630+
if (count == 0 && facadeRef.refersTo(null)) {
620631
selmgr.wakeupSelector();
621632
}
622633
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
@@ -638,7 +649,7 @@ final long streamUnreference() {
638649
final long http2Count = pendingHttp2StreamCount.decrementAndGet();
639650
final long httpCount = pendingHttpOperationsCount.get();
640651
final long webSocketCount = pendingWebSocketCount.get();
641-
if (count == 0 && facade() == null) {
652+
if (count == 0 && facadeRef.refersTo(null)) {
642653
selmgr.wakeupSelector();
643654
}
644655
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
@@ -660,7 +671,7 @@ final long webSocketClose() {
660671
final long webSocketCount = pendingWebSocketCount.decrementAndGet();
661672
final long httpCount = pendingHttpOperationsCount.get();
662673
final long http2Count = pendingHttp2StreamCount.get();
663-
if (count == 0 && facade() == null) {
674+
if (count == 0 && facadeRef.refersTo(null)) {
664675
selmgr.wakeupSelector();
665676
}
666677
assert httpCount >= 0 : "count of HTTP/1.1 operations < 0";
@@ -686,6 +697,7 @@ final static class HttpClientTracker implements Tracker {
686697
final AtomicLong websocketCount;
687698
final AtomicLong operationsCount;
688699
final AtomicLong connnectionsCount;
700+
final AtomicLong subscribersCount;
689701
final Reference<?> reference;
690702
final AtomicBoolean isAlive;
691703
final String name;
@@ -695,6 +707,7 @@ final static class HttpClientTracker implements Tracker {
695707
AtomicLong ws,
696708
AtomicLong ops,
697709
AtomicLong conns,
710+
AtomicLong subscribers,
698711
Reference<?> ref,
699712
AtomicBoolean isAlive,
700713
String name) {
@@ -704,11 +717,16 @@ final static class HttpClientTracker implements Tracker {
704717
this.websocketCount = ws;
705718
this.operationsCount = ops;
706719
this.connnectionsCount = conns;
720+
this.subscribersCount = subscribers;
707721
this.reference = ref;
708722
this.isAlive = isAlive;
709723
this.name = name;
710724
}
711725
@Override
726+
public long getOutstandingSubscribers() {
727+
return subscribersCount.get();
728+
}
729+
@Override
712730
public long getOutstandingOperations() {
713731
return operationsCount.get();
714732
}
@@ -748,6 +766,7 @@ public Tracker getOperationsTracker() {
748766
pendingWebSocketCount,
749767
pendingOperationCount,
750768
pendingTCPConnectionCount,
769+
pendingSubscribersCount,
751770
facadeRef,
752771
isAlive,
753772
dbgTag);

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
346346
Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler, ResponseInfo response) {
347347
Http2StreamResponseSubscriber<T> subscriber =
348348
new Http2StreamResponseSubscriber<>(handler.apply(response));
349-
registerResponseSubscriber(subscriber);
350349
return subscriber;
351350
}
352351

@@ -357,8 +356,8 @@ private void registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscri
357356
client().registerSubscriber(subscriber);
358357
}
359358

360-
private void subscriberCompleted(Http2StreamResponseSubscriber<?> subscriber) {
361-
client().subscriberCompleted(subscriber);
359+
private void unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
360+
client().unregisterSubscriber(subscriber);
362361
}
363362

364363
@Override
@@ -1658,14 +1657,23 @@ final class Http2StreamResponseSubscriber<U> extends HttpBodySubscriberWrapper<U
16581657
super(subscriber);
16591658
}
16601659

1660+
@Override
1661+
protected void onSubscribed() {
1662+
registerResponseSubscriber(this);
1663+
}
1664+
16611665
@Override
16621666
protected void complete(Throwable t) {
16631667
try {
1664-
Stream.this.subscriberCompleted(this);
1668+
unregisterResponseSubscriber(this);
16651669
} finally {
16661670
super.complete(t);
16671671
}
16681672
}
1673+
@Override
1674+
protected void onCancel() {
1675+
unregisterResponseSubscriber(this);
1676+
}
16691677
}
16701678

16711679
private static final VarHandle STREAM_STATE;

src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,14 @@
2929
import java.nio.ByteBuffer;
3030
import java.util.Comparator;
3131
import java.util.List;
32+
import java.util.Objects;
33+
import java.util.concurrent.CompletableFuture;
3234
import java.util.concurrent.CompletionStage;
3335
import java.util.concurrent.Flow;
3436
import java.util.concurrent.Flow.Subscription;
3537
import java.util.concurrent.atomic.AtomicBoolean;
3638
import java.util.concurrent.atomic.AtomicLong;
39+
import java.util.concurrent.locks.ReentrantLock;
3740

3841
import jdk.internal.net.http.ResponseSubscribers.TrustedSubscriber;
3942

@@ -61,12 +64,34 @@ public void cancel() { }
6164
final BodySubscriber<T> userSubscriber;
6265
final AtomicBoolean completed = new AtomicBoolean();
6366
final AtomicBoolean subscribed = new AtomicBoolean();
64-
volatile Subscription subscription;
67+
final ReentrantLock subscriptionLock = new ReentrantLock();
68+
volatile SubscriptionWrapper subscription;
6569
volatile Throwable withError;
6670
public HttpBodySubscriberWrapper(BodySubscriber<T> userSubscriber) {
6771
this.userSubscriber = userSubscriber;
6872
}
6973

74+
private class SubscriptionWrapper implements Subscription {
75+
final Subscription subscription;
76+
SubscriptionWrapper(Subscription s) {
77+
this.subscription = Objects.requireNonNull(s);
78+
}
79+
@Override
80+
public void request(long n) {
81+
subscription.request(n);
82+
}
83+
84+
@Override
85+
public void cancel() {
86+
try {
87+
subscription.cancel();
88+
onCancel();
89+
} catch (Throwable t) {
90+
onError(t);
91+
}
92+
}
93+
}
94+
7095
final long id() { return id; }
7196

7297
@Override
@@ -78,16 +103,20 @@ public boolean needsExecutor() {
78103
// subscribed yet.
79104
private void propagateError(Throwable t) {
80105
assert t != null;
106+
assert completed.get();
81107
try {
82108
// if unsubscribed at this point, it will not
83109
// get subscribed later - so do it now and
84110
// propagate the error
85111
// Race condition with onSubscribe: we need to wait until
86112
// subscription is finished before calling onError;
87-
synchronized (this) {
113+
subscriptionLock.lock();
114+
try {
88115
if (subscribed.compareAndSet(false, true)) {
89116
userSubscriber.onSubscribe(NOP);
90117
}
118+
} finally {
119+
subscriptionLock.unlock();
91120
}
92121
} finally {
93122
// if onError throws then there is nothing to do
@@ -97,6 +126,23 @@ private void propagateError(Throwable t) {
97126
}
98127
}
99128

129+
/**
130+
* Called when the subscriber cancels its subscription.
131+
* @apiNote
132+
* This method may be used by subclasses to perform cleanup
133+
* actions after a subscription has been cancelled.
134+
*/
135+
protected void onCancel() { }
136+
137+
/**
138+
* Called right before the userSubscriber::onSubscribe is called.
139+
* @apiNote
140+
* This method may be used by subclasses to perform cleanup
141+
* related actions after a subscription has been succesfully
142+
* accepted.
143+
*/
144+
protected void onSubscribed() { }
145+
100146
/**
101147
* Complete the subscriber, either normally or exceptionally
102148
* ensure that the subscriber is completed only once.
@@ -137,27 +183,29 @@ public CompletionStage<T> getBody() {
137183

138184
@Override
139185
public void onSubscribe(Flow.Subscription subscription) {
140-
this.subscription = subscription;
141186
// race condition with propagateError: we need to wait until
142187
// subscription is finished before calling onError;
143-
synchronized (this) {
188+
subscriptionLock.lock();
189+
try {
144190
if (subscribed.compareAndSet(false, true)) {
145-
userSubscriber.onSubscribe(subscription);
191+
onSubscribed();
192+
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
193+
userSubscriber.onSubscribe(this.subscription = wrapped);
146194
} else {
147-
// could be already subscribed and completed
148-
// if an unexpected error occurred before the actual
149-
// subscription - though that's not supposed
150-
// happen.
151-
assert completed.get();
195+
subscription.cancel();
152196
}
197+
} finally {
198+
subscriptionLock.unlock();
153199
}
154200
}
155201

156202
@Override
157203
public void onNext(List<ByteBuffer> item) {
204+
assert subscribed.get();
158205
if (completed.get()) {
206+
SubscriptionWrapper subscription = this.subscription;
159207
if (subscription != null) {
160-
subscription.cancel();
208+
subscription.subscription.cancel();
161209
}
162210
} else {
163211
userSubscriber.onNext(item);

src/java.net.http/share/classes/jdk/internal/net/http/common/OperationTrackers.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ public interface Tracker {
5959
long getOutstandingWebSocketOperations();
6060
// number of TCP connections still opened
6161
long getOutstandingTcpConnections();
62+
// number of body subscribers not yet completed or canceled
63+
long getOutstandingSubscribers();
6264
// Whether the facade returned to the
6365
// user is still referenced
6466
boolean isFacadeReferenced();

0 commit comments

Comments
 (0)