Skip to content

Commit 4a3c2a6

Browse files
committed
refactor(core): pipeline: call isReleaseRequested in the calling scope
1 parent 819d339 commit 4a3c2a6

File tree

1 file changed

+118
-82
lines changed

1 file changed

+118
-82
lines changed

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt

Lines changed: 118 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -397,23 +397,23 @@ open class StreamerPipeline(
397397
withVideo: Boolean = this.withVideo,
398398
endpointFactory: IEndpointInternal.Factory = DynamicEndpointFactory(),
399399
@RotationValue targetRotation: Int = context.displayRotation
400-
): IConfigurableAudioVideoEncodingPipelineOutput =
401-
withContext(dispatcherProvider.default) {
402-
if (isReleaseRequested.get()) {
403-
throw IllegalStateException("Pipeline is released")
404-
}
405-
require(withAudio || withVideo) { "At least one of audio or video must be set" }
406-
val withAudioCorrected = if (this@StreamerPipeline.withAudio) {
407-
withAudio
408-
} else {
409-
false
410-
}
411-
val withVideoCorrected = if (this@StreamerPipeline.withVideo) {
412-
withVideo
413-
} else {
414-
false
415-
}
400+
): IConfigurableAudioVideoEncodingPipelineOutput {
401+
if (isReleaseRequested.get()) {
402+
throw IllegalStateException("Pipeline is released")
403+
}
404+
require(withAudio || withVideo) { "At least one of audio or video must be set" }
405+
val withAudioCorrected = if (this@StreamerPipeline.withAudio) {
406+
withAudio
407+
} else {
408+
false
409+
}
410+
val withVideoCorrected = if (this@StreamerPipeline.withVideo) {
411+
withVideo
412+
} else {
413+
false
414+
}
416415

416+
return withContext(dispatcherProvider.default) {
417417
val output =
418418
EncodingPipelineOutput(
419419
context,
@@ -425,6 +425,7 @@ open class StreamerPipeline(
425425
)
426426
addOutput(output)
427427
}
428+
}
428429

429430
/**
430431
* Adds an output.
@@ -442,26 +443,32 @@ open class StreamerPipeline(
442443
"Output must be an audio or video output"
443444
}
444445

445-
if (safeOutputCall { outputs -> outputs.contains(output) }) {
446-
throw IllegalStateException("Output $output already added")
447-
}
448-
require(!output.isStreaming) { "Output $output is already streaming" }
449-
450-
try {
451-
val jobs = addOutputImpl(output, coroutineScope)
452-
safeOutputCall {
453-
outputsToJobsMap[output] = jobs
446+
return withContext(dispatcherProvider.default) {
447+
if (safeOutputCall { outputs -> outputs.contains(output) }) {
448+
throw IllegalStateException("Output $output already added")
454449
}
455-
} catch (t: Throwable) {
456-
removeOutput(output)
450+
require(!output.isStreaming) { "Output $output is already streaming" }
451+
457452
try {
458-
output.release()
459-
} catch (t2: Throwable) {
460-
Logger.e(TAG, "Error while releasing output $output after a failure to add it", t2)
453+
val jobs = addOutputImpl(output, coroutineScope)
454+
safeOutputCall {
455+
outputsToJobsMap[output] = jobs
456+
}
457+
} catch (t: Throwable) {
458+
removeOutput(output)
459+
try {
460+
output.release()
461+
} catch (t2: Throwable) {
462+
Logger.e(
463+
TAG,
464+
"Error while releasing output $output after a failure to add it",
465+
t2
466+
)
467+
}
468+
throw t
461469
}
462-
throw t
470+
output
463471
}
464-
return output
465472
}
466473

467474
/**
@@ -801,27 +808,32 @@ open class StreamerPipeline(
801808
throw IllegalStateException("Pipeline is released")
802809
}
803810

804-
val jobs = mutableListOf<Job>()
805-
val exceptions = mutableListOf<Throwable>()
806-
safeOutputCall { outputs ->
807-
outputs.keys.forEach { output ->
808-
jobs += coroutineScope.launch {
809-
try {
810-
output.startStream()
811-
} catch (t: Throwable) {
812-
exceptions += t
813-
Logger.w(TAG, "startStream: Can't start output $output: ${t.message}")
811+
withContext(dispatcherProvider.default) {
812+
val jobs = mutableListOf<Job>()
813+
val exceptions = mutableListOf<Throwable>()
814+
safeOutputCall { outputs ->
815+
outputs.keys.forEach { output ->
816+
jobs += coroutineScope.launch {
817+
try {
818+
output.startStream()
819+
} catch (t: Throwable) {
820+
exceptions += t
821+
Logger.w(
822+
TAG,
823+
"startStream: Can't start output $output: ${t.message}"
824+
)
825+
}
814826
}
815827
}
816828
}
817-
}
818-
jobs.joinAll()
829+
jobs.joinAll()
819830

820-
if (exceptions.isNotEmpty()) {
821-
if (exceptions.size == 1) {
822-
throw exceptions.first()
823-
} else {
824-
throw MultiThrowable(exceptions)
831+
if (exceptions.isNotEmpty()) {
832+
if (exceptions.size == 1) {
833+
throw exceptions.first()
834+
} else {
835+
throw MultiThrowable(exceptions)
836+
}
825837
}
826838
}
827839
}
@@ -851,8 +863,10 @@ open class StreamerPipeline(
851863
*/
852864
private suspend fun stopStreamInputsIfNeededUnsafe(output: IPipelineOutput) {
853865
// If sources are not streaming, do nothing
854-
var isAudioSourceStreaming = _audioInput != null && _audioInput.isStreamingFlow.value
855-
var isVideoSourceStreaming = _videoInput != null && _videoInput.isStreamingFlow.value
866+
var isAudioSourceStreaming =
867+
_audioInput != null && _audioInput.isStreamingFlow.value
868+
var isVideoSourceStreaming =
869+
_videoInput != null && _videoInput.isStreamingFlow.value
856870
if (!isAudioSourceStreaming && !isVideoSourceStreaming) {
857871
return
858872
}
@@ -865,7 +879,10 @@ open class StreamerPipeline(
865879
try {
866880
it.stopStream()
867881
} catch (t: Throwable) {
868-
Logger.w(TAG, "stopStream: Can't stop audio input: ${t.message}")
882+
Logger.w(
883+
TAG,
884+
"stopStream: Can't stop audio input: ${t.message}"
885+
)
869886
}
870887
}
871888
}
@@ -879,7 +896,10 @@ open class StreamerPipeline(
879896
try {
880897
it.stopStream()
881898
} catch (t: Throwable) {
882-
Logger.w(TAG, "stopStream: Can't stop video input: ${t.message}")
899+
Logger.w(
900+
TAG,
901+
"stopStream: Can't stop video input: ${t.message}"
902+
)
883903
}
884904
}
885905
}
@@ -889,8 +909,10 @@ open class StreamerPipeline(
889909
videoJob?.join()
890910

891911
// set isStreamingFlow to false if no more inputs are streaming
892-
isAudioSourceStreaming = _audioInput != null && _audioInput.isStreamingFlow.value
893-
isVideoSourceStreaming = _videoInput != null && _videoInput.isStreamingFlow.value
912+
isAudioSourceStreaming =
913+
_audioInput != null && _audioInput.isStreamingFlow.value
914+
isVideoSourceStreaming =
915+
_videoInput != null && _videoInput.isStreamingFlow.value
894916
if (!isAudioSourceStreaming && !isVideoSourceStreaming) {
895917
_isStreamingFlow.emit(false)
896918
}
@@ -923,16 +945,18 @@ open class StreamerPipeline(
923945
*
924946
* It stops audio and video sources and calls [IPipelineOutput.stopStream] on all outputs.
925947
*/
926-
override suspend fun stopStream() = withContext(dispatcherProvider.default) {
948+
override suspend fun stopStream() {
927949
if (isReleaseRequested.get()) {
928950
throw IllegalStateException("Pipeline is released")
929951
}
952+
withContext(dispatcherProvider.default) {
930953

931-
inputMutex.withLock {
932-
stopStreamInputsUnsafe()
933-
}
954+
inputMutex.withLock {
955+
stopStreamInputsUnsafe()
956+
}
934957

935-
stopStreamOutputs()
958+
stopStreamOutputs()
959+
}
936960
}
937961

938962
private suspend fun releaseSourcesUnsafe() {
@@ -956,12 +980,18 @@ open class StreamerPipeline(
956980
try {
957981
detachOutput(output)
958982
} catch (t: Throwable) {
959-
Logger.w(TAG, "release: Can't detach output $output: ${t.message}")
983+
Logger.w(
984+
TAG,
985+
"release: Can't detach output $output: ${t.message}"
986+
)
960987
}
961988
try {
962989
output.release()
963990
} catch (t: Throwable) {
964-
Logger.w(TAG, "release: Can't release output $output: ${t.message}")
991+
Logger.w(
992+
TAG,
993+
"release: Can't release output $output: ${t.message}"
994+
)
965995
}
966996
}
967997
outputs.clear()
@@ -974,27 +1004,31 @@ open class StreamerPipeline(
9741004
* It releases the audio and video sources and the processors.
9751005
* It also calls [IPipelineOutput.release] on all outputs.
9761006
*/
977-
override suspend fun release() = withContext(dispatcherProvider.default) {
1007+
override suspend fun release() {
9781008
if (isReleaseRequested.getAndSet(true)) {
9791009
Logger.w(TAG, "Already released")
980-
return@withContext
1010+
return
9811011
}
9821012
Logger.d(TAG, "Releasing pipeline")
983-
984-
// Sources
985-
inputMutex.withLock {
986-
try {
987-
releaseSourcesUnsafe()
988-
} catch (t: Throwable) {
989-
Logger.w(TAG, "release: Can't release sources: ${t.message}")
1013+
withContext(dispatcherProvider.default) {
1014+
// Sources
1015+
inputMutex.withLock {
1016+
try {
1017+
releaseSourcesUnsafe()
1018+
} catch (t: Throwable) {
1019+
Logger.w(
1020+
TAG,
1021+
"release: Can't release sources: ${t.message}"
1022+
)
1023+
}
9901024
}
991-
}
992-
Logger.d(TAG, "Sources released")
1025+
Logger.d(TAG, "Sources released")
9931026

994-
// Outputs
995-
releaseOutputs()
1027+
// Outputs
1028+
releaseOutputs()
9961029

997-
coroutineScope.cancel()
1030+
coroutineScope.cancel()
1031+
}
9981032
}
9991033

10001034
private suspend fun <T> safeOutputCall(block: suspend (MutableMap<IPipelineOutput, List<Job>>) -> T) =
@@ -1006,22 +1040,24 @@ open class StreamerPipeline(
10061040

10071041
private suspend fun <T> safeStreamingOutputCall(block: suspend (Map<IPipelineOutput, List<Job>>) -> T) =
10081042
safeOutputCall { outputs ->
1009-
val streamingOutputs = outputs.filter { it.key.isStreamingFlow.value }
1043+
val streamingOutputs =
1044+
outputs.filter { it.key.isStreamingFlow.value }
10101045
block(streamingOutputs)
10111046
}
10121047

10131048
/**
10141049
* Executes a block with the [coroutineDispatcher] and the [inputMutex] locked.
10151050
*/
1016-
private suspend fun <T> withContextInputMutex(block: suspend () -> T): T =
1017-
withContext(dispatcherProvider.default) {
1018-
if (isReleaseRequested.get()) {
1019-
throw IllegalStateException("Pipeline is released")
1020-
}
1051+
private suspend fun <T> withContextInputMutex(block: suspend () -> T): T {
1052+
if (isReleaseRequested.get()) {
1053+
throw IllegalStateException("Pipeline is released")
1054+
}
1055+
return withContext(dispatcherProvider.default) {
10211056
inputMutex.withLock {
10221057
block()
10231058
}
10241059
}
1060+
}
10251061

10261062
companion object {
10271063
private const val TAG = "StreamerPipeline"

0 commit comments

Comments
 (0)