Skip to content

Commit e17bb56

Browse files
committed
refactor(core): pipeline: call isReleaseRequested in the calling scope
1 parent b8e3f58 commit e17bb56

File tree

1 file changed

+87
-74
lines changed

1 file changed

+87
-74
lines changed

core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt

Lines changed: 87 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -383,23 +383,23 @@ open class StreamerPipeline(
383383
endpointFactory: IEndpointInternal.Factory = DynamicEndpointFactory(),
384384
@RotationValue targetRotation: Int = context.displayRotation,
385385
coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default
386-
): IConfigurableAudioVideoEncodingPipelineOutput =
387-
withContext(this@StreamerPipeline.coroutineDispatcher) {
388-
if (isReleaseRequested.get()) {
389-
throw IllegalStateException("Pipeline is released")
390-
}
391-
require(withAudio || withVideo) { "At least one of audio or video must be set" }
392-
val withAudioCorrected = if (this@StreamerPipeline.withAudio) {
393-
withAudio
394-
} else {
395-
false
396-
}
397-
val withVideoCorrected = if (this@StreamerPipeline.withVideo) {
398-
withVideo
399-
} else {
400-
false
401-
}
386+
): IConfigurableAudioVideoEncodingPipelineOutput {
387+
if (isReleaseRequested.get()) {
388+
throw IllegalStateException("Pipeline is released")
389+
}
390+
require(withAudio || withVideo) { "At least one of audio or video must be set" }
391+
val withAudioCorrected = if (this@StreamerPipeline.withAudio) {
392+
withAudio
393+
} else {
394+
false
395+
}
396+
val withVideoCorrected = if (this@StreamerPipeline.withVideo) {
397+
withVideo
398+
} else {
399+
false
400+
}
402401

402+
return withContext(this@StreamerPipeline.coroutineDispatcher) {
403403
val output =
404404
EncodingPipelineOutput(
405405
context,
@@ -411,6 +411,7 @@ open class StreamerPipeline(
411411
)
412412
addOutput(output)
413413
}
414+
}
414415

415416
/**
416417
* Adds an output.
@@ -428,26 +429,32 @@ open class StreamerPipeline(
428429
"Output must be an audio or video output"
429430
}
430431

431-
if (safeOutputCall { outputs -> outputs.contains(output) }) {
432-
throw IllegalStateException("Output $output already added")
433-
}
434-
require(!output.isStreaming) { "Output $output is already streaming" }
435-
436-
try {
437-
val jobs = addOutputImpl(output, coroutineScope)
438-
safeOutputCall {
439-
outputsToJobsMap[output] = jobs
432+
return withContext(coroutineDispatcher) {
433+
if (safeOutputCall { outputs -> outputs.contains(output) }) {
434+
throw IllegalStateException("Output $output already added")
440435
}
441-
} catch (t: Throwable) {
442-
removeOutput(output)
436+
require(!output.isStreaming) { "Output $output is already streaming" }
437+
443438
try {
444-
output.release()
445-
} catch (t2: Throwable) {
446-
Logger.e(TAG, "Error while releasing output $output after a failure to add it", t2)
439+
val jobs = addOutputImpl(output, coroutineScope)
440+
safeOutputCall {
441+
outputsToJobsMap[output] = jobs
442+
}
443+
} catch (t: Throwable) {
444+
removeOutput(output)
445+
try {
446+
output.release()
447+
} catch (t2: Throwable) {
448+
Logger.e(
449+
TAG,
450+
"Error while releasing output $output after a failure to add it",
451+
t2
452+
)
453+
}
454+
throw t
447455
}
448-
throw t
456+
output
449457
}
450-
return output
451458
}
452459

453460
/**
@@ -777,32 +784,34 @@ open class StreamerPipeline(
777784
* If an [IEncodingPipelineOutput] is not opened, it won't start the stream and will throw an
778785
* exception. But the other outputs will be started.
779786
*/
780-
override suspend fun startStream() = withContext(coroutineDispatcher) {
787+
override suspend fun startStream() {
781788
if (isReleaseRequested.get()) {
782789
throw IllegalStateException("Pipeline is released")
783790
}
784791

785-
val jobs = mutableListOf<Job>()
786-
val exceptions = mutableListOf<Throwable>()
787-
safeOutputCall { outputs ->
788-
outputs.keys.forEach { output ->
789-
jobs += coroutineScope.launch {
790-
try {
791-
output.startStream()
792-
} catch (t: Throwable) {
793-
exceptions += t
794-
Logger.w(TAG, "startStream: Can't start output $output: ${t.message}")
792+
withContext(coroutineDispatcher) {
793+
val jobs = mutableListOf<Job>()
794+
val exceptions = mutableListOf<Throwable>()
795+
safeOutputCall { outputs ->
796+
outputs.keys.forEach { output ->
797+
jobs += coroutineScope.launch {
798+
try {
799+
output.startStream()
800+
} catch (t: Throwable) {
801+
exceptions += t
802+
Logger.w(TAG, "startStream: Can't start output $output: ${t.message}")
803+
}
795804
}
796805
}
797806
}
798-
}
799-
jobs.joinAll()
807+
jobs.joinAll()
800808

801-
if (exceptions.isNotEmpty()) {
802-
if (exceptions.size == 1) {
803-
throw exceptions.first()
804-
} else {
805-
throw MultiException(exceptions)
809+
if (exceptions.isNotEmpty()) {
810+
if (exceptions.size == 1) {
811+
throw exceptions.first()
812+
} else {
813+
throw MultiException(exceptions)
814+
}
806815
}
807816
}
808817
}
@@ -904,16 +913,18 @@ open class StreamerPipeline(
904913
*
905914
* It stops audio and video sources and calls [IPipelineOutput.stopStream] on all outputs.
906915
*/
907-
override suspend fun stopStream() = withContext(coroutineDispatcher) {
916+
override suspend fun stopStream() {
908917
if (isReleaseRequested.get()) {
909918
throw IllegalStateException("Pipeline is released")
910919
}
920+
withContext(coroutineDispatcher) {
911921

912-
inputMutex.withLock {
913-
stopStreamInputsUnsafe()
914-
}
922+
inputMutex.withLock {
923+
stopStreamInputsUnsafe()
924+
}
915925

916-
stopStreamOutputs()
926+
stopStreamOutputs()
927+
}
917928
}
918929

919930
private suspend fun releaseSourcesUnsafe() {
@@ -955,27 +966,28 @@ open class StreamerPipeline(
955966
* It releases the audio and video sources and the processors.
956967
* It also calls [IPipelineOutput.release] on all outputs.
957968
*/
958-
override suspend fun release() = withContext(coroutineDispatcher) {
969+
override suspend fun release() {
959970
if (isReleaseRequested.getAndSet(true)) {
960971
Logger.w(TAG, "Already released")
961-
return@withContext
972+
return
962973
}
963974
Logger.d(TAG, "Releasing pipeline")
964-
965-
// Sources
966-
inputMutex.withLock {
967-
try {
968-
releaseSourcesUnsafe()
969-
} catch (t: Throwable) {
970-
Logger.w(TAG, "release: Can't release sources: ${t.message}")
975+
withContext(coroutineDispatcher) {
976+
// Sources
977+
inputMutex.withLock {
978+
try {
979+
releaseSourcesUnsafe()
980+
} catch (t: Throwable) {
981+
Logger.w(TAG, "release: Can't release sources: ${t.message}")
982+
}
971983
}
972-
}
973-
Logger.d(TAG, "Sources released")
984+
Logger.d(TAG, "Sources released")
974985

975-
// Outputs
976-
releaseOutputs()
986+
// Outputs
987+
releaseOutputs()
977988

978-
coroutineScope.cancel()
989+
coroutineScope.cancel()
990+
}
979991
}
980992

981993
private suspend fun <T> safeOutputCall(block: suspend (MutableMap<IPipelineOutput, List<Job>>) -> T) =
@@ -994,15 +1006,16 @@ open class StreamerPipeline(
9941006
/**
9951007
* Executes a block with the [coroutineDispatcher] and the [inputMutex] locked.
9961008
*/
997-
private suspend fun <T> withContextInputMutex(block: suspend () -> T): T =
998-
withContext(coroutineDispatcher) {
999-
if (isReleaseRequested.get()) {
1000-
throw IllegalStateException("Pipeline is released")
1001-
}
1009+
private suspend fun <T> withContextInputMutex(block: suspend () -> T): T {
1010+
if (isReleaseRequested.get()) {
1011+
throw IllegalStateException("Pipeline is released")
1012+
}
1013+
return withContext(coroutineDispatcher) {
10021014
inputMutex.withLock {
10031015
block()
10041016
}
10051017
}
1018+
}
10061019

10071020
companion object {
10081021
private const val TAG = "StreamerPipeline"

0 commit comments

Comments
 (0)