Skip to content

Commit 5edc0ce

Browse files
committed
refactor(core): endpoint: add a way to returns asynchronous throwable that happens on stream
1 parent 5437c4b commit 5edc0ce

File tree

7 files changed

+46
-10
lines changed

7 files changed

+46
-10
lines changed

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2323
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2424
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
2525
import kotlinx.coroutines.flow.MutableStateFlow
26+
import kotlinx.coroutines.flow.StateFlow
2627
import kotlinx.coroutines.flow.asStateFlow
2728
import kotlinx.coroutines.runBlocking
2829

@@ -55,6 +56,7 @@ class DummyEndpoint : IEndpointInternal {
5556

5657
override val metrics: Any
5758
get() = TODO("Not yet implemented")
59+
override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()
5860

5961
override suspend fun open(descriptor: MediaDescriptor) {
6062
_isOpenFlow.emit(true)

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import kotlinx.coroutines.cancelChildren
3232
import kotlinx.coroutines.flow.SharingStarted
3333
import kotlinx.coroutines.flow.StateFlow
3434
import kotlinx.coroutines.flow.combineTransform
35+
import kotlinx.coroutines.flow.merge
3536
import kotlinx.coroutines.flow.stateIn
3637
import kotlinx.coroutines.launch
3738

@@ -91,6 +92,14 @@ open class CombineEndpoint(
9192
false
9293
)
9394

95+
override val throwableFlow: StateFlow<Throwable?> = merge(
96+
*endpointInternals.map { it.throwableFlow }.toTypedArray()
97+
).stateIn(
98+
coroutineScope,
99+
started = SharingStarted.Eagerly,
100+
initialValue = null
101+
)
102+
94103
/**
95104
* The union of all endpoints' [IEndpoint.IEndpointInfo].
96105
*/

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ import kotlinx.coroutines.CoroutineDispatcher
3232
import kotlinx.coroutines.CoroutineScope
3333
import kotlinx.coroutines.cancelChildren
3434
import kotlinx.coroutines.flow.MutableStateFlow
35+
import kotlinx.coroutines.flow.SharingStarted
3536
import kotlinx.coroutines.flow.StateFlow
3637
import kotlinx.coroutines.flow.asStateFlow
3738
import kotlinx.coroutines.flow.map
39+
import kotlinx.coroutines.flow.stateIn
3840
import kotlinx.coroutines.launch
3941
import kotlinx.coroutines.runBlocking
4042
import kotlinx.coroutines.sync.Mutex
@@ -71,6 +73,13 @@ open class DynamicEndpoint(
7173
private val _isOpenFlow = MutableStateFlow(false)
7274
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow.asStateFlow()
7375

76+
private val throwableFlows = endpointFlow.map { it?.throwableFlow }
77+
override val throwableFlow: StateFlow<Throwable?> = throwableFlows.map { it?.value }.stateIn(
78+
coroutineScope,
79+
started = SharingStarted.Eagerly,
80+
initialValue = null
81+
)
82+
7483
/**
7584
* Only available when the endpoint is opened.
7685
*/
@@ -244,7 +253,7 @@ open class DynamicEndpoint(
244253

245254
private fun getRtmpEndpoint(): IEndpointInternal {
246255
if (rtmpEndpoint == null) {
247-
rtmpEndpoint = Endpoints.createRtmpEndpoint(ioDispatcher)
256+
rtmpEndpoint = Endpoints.createRtmpEndpoint(defaultDispatcher, ioDispatcher)
248257
}
249258
return rtmpEndpoint!!
250259
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ package io.github.thibaultbee.streampack.core.elements.endpoints
1717

1818
import android.content.Context
1919
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
20-
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2120
import io.github.thibaultbee.streampack.core.elements.data.Frame
21+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2222
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2323
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
2424
import io.github.thibaultbee.streampack.core.elements.interfaces.Releasable
@@ -30,6 +30,13 @@ import kotlinx.coroutines.runBlocking
3030

3131
interface IEndpointInternal : IEndpoint, SuspendStreamable,
3232
SuspendCloseable, Releasable {
33+
34+
/**
35+
* An asynchronous error that occurred during streaming.
36+
* Synchronous errors must be thrown directly by the functions.
37+
*/
38+
val throwableFlow: StateFlow<Throwable?>
39+
3340
/**
3441
* Opens the endpoint.
3542
* The endpoint must check if the [MediaDescriptor] is supported and if it is not already opened.

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import io.github.thibaultbee.streampack.core.logger.Logger
3030
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
3131
import kotlinx.coroutines.CoroutineDispatcher
3232
import kotlinx.coroutines.flow.MutableStateFlow
33+
import kotlinx.coroutines.flow.StateFlow
3334
import kotlinx.coroutines.flow.asStateFlow
3435
import kotlinx.coroutines.runBlocking
3536
import kotlinx.coroutines.sync.Mutex
@@ -70,6 +71,8 @@ class MediaMuxerEndpoint(
7071
private val _isOpenFlow = MutableStateFlow(false)
7172
override val isOpenFlow = _isOpenFlow.asStateFlow()
7273

74+
override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()
75+
7376
override suspend fun open(descriptor: MediaDescriptor) = mutex.withLock {
7477
when (state) {
7578
State.PENDING_START, State.STARTED -> {

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
2727
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ISinkInternal
2828
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.SinkConfiguration
2929
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
30+
import kotlinx.coroutines.flow.MutableStateFlow
3031
import kotlinx.coroutines.flow.StateFlow
32+
import kotlinx.coroutines.flow.asStateFlow
3133
import kotlinx.coroutines.runBlocking
3234
import kotlinx.coroutines.sync.Mutex
3335
import kotlinx.coroutines.sync.withLock
@@ -66,6 +68,8 @@ class CompositeEndpoint(
6668
override val isOpenFlow: StateFlow<Boolean>
6769
get() = sink.isOpenFlow
6870

71+
override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()
72+
6973
override suspend fun open(descriptor: MediaDescriptor) {
7074
sink.open(descriptor)
7175
}

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/outputs/encoding/EncodingPipelineOutput.kt

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,7 @@ internal class EncodingPipelineOutput(
259259
audioEncoderListener.outputChannel.consumeEach { closeableFrame ->
260260
try {
261261
audioStreamId?.let {
262-
endpointInternal.write(
263-
closeableFrame,
264-
it
265-
)
262+
endpointInternal.write(closeableFrame, it)
266263
} ?: Logger.w(TAG, "Audio frame received but audio stream is not set")
267264
} catch (t: Throwable) {
268265
onInternalError(t)
@@ -276,17 +273,22 @@ internal class EncodingPipelineOutput(
276273
videoEncoderListener.outputChannel.consumeEach { closeableFrame ->
277274
try {
278275
videoStreamId?.let {
279-
endpointInternal.write(
280-
closeableFrame,
281-
it
282-
)
276+
endpointInternal.write(closeableFrame, it)
283277
} ?: Logger.w(TAG, "Video frame received but video stream is not set")
284278
} catch (t: Throwable) {
285279
onInternalError(t)
286280
}
287281
}
288282
}
289283
}
284+
285+
coroutineScope.launch {
286+
endpointInternal.throwableFlow.collect { t ->
287+
t?.let {
288+
onInternalError(it)
289+
}
290+
}
291+
}
290292
}
291293

292294
private val _audioCodecConfigFlow = MutableStateFlow<AudioCodecConfig?>(null)

0 commit comments

Comments
 (0)