Skip to content

[BUG] OkHttpFluxRequestBody does not correctly propagate IOExceptions #47169

@mlichtblau

Description

@mlichtblau

Describe the bug

We are seeing unhandled exception logs when a java.net.SocketException occurs during connection creation.
I believe the issues is that here in the implementation of the okhttp RequestBody it is expected to throw an IOException.
There are various points in the stack trace where okhttp can gracefully handle IOExceptions, see for example the RetryAndFollowUpInterceptor.
However, the custom OkHttpFluxRequestBody is propagating the SocketException wrapped in a reactor.core.Exceptions$ReactiveException so this is not handled correctly.

I would like to contribute a PR where the affected code path is wrapped in a try catch block and in case the ReactiveException is caused by an IOException then the IOException is thrown.

Exception or Stack Trace

Exception in thread "OkHttp Dispatcher" reactor.core.Exceptions$ReactiveException: java.net.SocketException: Connection reset by peer
at reactor.core.Exceptions.propagate(Exceptions.java:410)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:102)
at reactor.core.publisher.Mono.block(Mono.java:1779)
at com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody.writeTo(OkHttpFluxRequestBody.java:94)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:62)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at datadog.trace.instrumentation.okhttp3.TracingInterceptor.intercept(TracingInterceptor.java:39)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:517)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104)
... 19 more
Caused by: java.net.SocketException: Connection reset by peer
at java.base/sun.nio.ch.SocketDispatcher.write0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:62)
at java.base/sun.nio.ch.NioSocketImpl.tryWrite(NioSocketImpl.java:394)
at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:410)
at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:440)
at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:819)
at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1205)
at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1304)
at okio.OutputStreamSink.write(JvmOkio.kt:56)
at okio.AsyncTimeout$sink$1.write(AsyncTimeout.kt:102)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
at okhttp3.internal.http1.Http1ExchangeCodec$KnownLengthSink.write(Http1ExchangeCodec.kt:279)
at okio.ForwardingSink.write(ForwardingSink.kt:29)
at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:89)
at com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody.writeBuffer(OkHttpFluxRequestBody.java:105)
at com.azure.core.http.okhttp.implementation.OkHttpFluxRequestBody.lambda$writeTo$1(OkHttpFluxRequestBody.java:81)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.onNext(FluxMergeSequential.java:208)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:180)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:335)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:294)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:256)
at reactor.core.publisher.FluxHandle$HandleSubscriber.request(FluxHandle.java:272)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.FluxMergeSequential$MergeSequentialMain.onSubscribe(FluxMergeSequential.java:198)
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onSubscribe(FluxHandle.java:90)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onSubscribe(FluxConcatArray.java:166)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:238)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:79)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.Mono.block(Mono.java:1778)
... 18 more"

Expected behavior
The original IOException is propagated correctly

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added

Metadata

Metadata

Labels

Azure.Coreazure-corebugThis issue requires a change to an existing behavior in the product in order to be resolved.customer-reportedIssues that are reported by GitHub users external to the Azure organization.needs-team-attentionWorkflow: This issue needs attention from Azure service team or SDK team

Type

No type

Projects

Status

No status

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions