From e24be41c1cc6bcb168f6a2c46e88e1fc770b4f67 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Thu, 9 Oct 2025 11:42:32 +0200 Subject: [PATCH 1/3] feat(core): add an API to take a snapshot --- .../video/DefaultSurfaceProcessor.kt | 101 ++++++++++++++++- .../processing/video/ISurfaceProcessor.kt | 7 +- .../core/pipelines/inputs/VideoInput.kt | 106 ++++++++++++++++-- 3 files changed, 200 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt index b282ba93d..2e9556fb6 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt @@ -16,18 +16,24 @@ */ package io.github.thibaultbee.streampack.core.elements.processing.video +import android.graphics.Bitmap import android.graphics.SurfaceTexture import android.util.Size import android.view.Surface +import androidx.annotation.IntRange import androidx.concurrent.futures.CallbackToFutureAdapter import com.google.common.util.concurrent.ListenableFuture import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.ISurfaceOutput import io.github.thibaultbee.streampack.core.elements.processing.video.utils.GLUtils +import io.github.thibaultbee.streampack.core.elements.processing.video.utils.extensions.preRotate +import io.github.thibaultbee.streampack.core.elements.processing.video.utils.extensions.preVerticalFlip import io.github.thibaultbee.streampack.core.elements.utils.av.video.DynamicRangeProfile +import io.github.thibaultbee.streampack.core.elements.utils.extensions.rotate import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.DispatcherProvider.Companion.THREAD_NAME_GL import io.github.thibaultbee.streampack.core.pipelines.IVideoDispatcherProvider import io.github.thibaultbee.streampack.core.pipelines.utils.HandlerThreadExecutor +import java.io.IOException import java.util.concurrent.atomic.AtomicBoolean @@ -37,6 +43,8 @@ private class DefaultSurfaceProcessor( ) : ISurfaceProcessorInternal, SurfaceTexture.OnFrameAvailableListener { private val renderer = OpenGlRenderer() + private val glHandler = glThread.handler + private val isReleaseRequested = AtomicBoolean(false) private var isReleased = false @@ -47,7 +55,7 @@ private class DefaultSurfaceProcessor( private val surfaceInputs: MutableList = mutableListOf() private val surfaceInputsTimestampInNsMap: MutableMap = hashMapOf() - private val glHandler = glThread.handler + private val pendingSnapshots = mutableListOf() init { val future = submitSafely { @@ -193,6 +201,19 @@ private class DefaultSurfaceProcessor( } } + override fun snapshot( + @IntRange(from = 0, to = 359) rotationDegrees: Int + ): ListenableFuture { + if (isReleaseRequested.get()) { + throw IllegalStateException("SurfaceProcessor is released") + } + return CallbackToFutureAdapter.getFuture { completer -> + executeSafely { + pendingSnapshots.add(PendingSnapshot(rotationDegrees, completer)) + } + } + } + // Executed on GL thread override fun onFrameAvailable(surfaceTexture: SurfaceTexture) { if (isReleaseRequested.get()) { @@ -217,6 +238,78 @@ private class DefaultSurfaceProcessor( Logger.e(TAG, "Error while rendering frame", t) } } + + // Surface, size and transform matrix for JPEG Surface if exists + if (pendingSnapshots.isNotEmpty()) { + try { + val first = surfaceOutputs.first() + val snapshotOutput = Pair( + first.descriptor.resolution, + surfaceOutputMatrix.clone() + ) + + // Execute all pending snapshots. + takeSnapshot(snapshotOutput) + } catch (e: RuntimeException) { + // Propagates error back to the app if failed to take snapshot. + failAllPendingSnapshots(e) + } + } + } + + /** + * Takes a snapshot of the current frame and draws it to given JPEG surface. + * + * @param snapshotOutput The pair for drawing. + */ + private fun takeSnapshot(snapshotOutput: Pair) { + if (pendingSnapshots.isEmpty()) { + // No pending snapshot requests, do nothing. + return + } + + // Write to JPEG surface, once for each snapshot request. + try { + for (pendingSnapshot in pendingSnapshots) { + val (size, transform) = snapshotOutput + + // Take a snapshot of the current frame. + val bitmap = getBitmap(size, transform, pendingSnapshot.rotationDegrees) + + // Complete the snapshot request. + pendingSnapshot.completer.set(bitmap) + } + pendingSnapshots.clear() + } catch (e: IOException) { + failAllPendingSnapshots(e) + } + } + + private fun failAllPendingSnapshots(throwable: Throwable) { + for (pendingSnapshot in pendingSnapshots) { + pendingSnapshot.completer.setException(throwable) + } + pendingSnapshots.clear() + } + + private fun getBitmap( + size: Size, + textureTransform: FloatArray, + rotationDegrees: Int + ): Bitmap { + val snapshotTransform = textureTransform.clone() + + // Rotate the output if requested. + snapshotTransform.preRotate(rotationDegrees.toFloat(), 0.5f, 0.5f) + + // Flip the snapshot. This is for reverting the GL transform added in SurfaceOutputImpl. + snapshotTransform.preVerticalFlip(0.5f) + + // Update the size based on the rotation degrees. + val rotatedSize = size.rotate(rotationDegrees) + + // Take a snapshot Bitmap and compress it to JPEG. + return renderer.snapshot(rotatedSize, snapshotTransform) } private fun executeSafely( @@ -258,6 +351,12 @@ private class DefaultSurfaceProcessor( } private data class SurfaceInput(val surface: Surface, val surfaceTexture: SurfaceTexture) + + private data class PendingSnapshot( + @IntRange(from = 0, to = 359) + val rotationDegrees: Int, + val completer: CallbackToFutureAdapter.Completer + ) } class DefaultSurfaceProcessorFactory : diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/ISurfaceProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/ISurfaceProcessor.kt index 04bbcc9ca..069e9b26f 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/ISurfaceProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/ISurfaceProcessor.kt @@ -15,8 +15,11 @@ */ package io.github.thibaultbee.streampack.core.elements.processing.video +import android.graphics.Bitmap import android.util.Size import android.view.Surface +import androidx.annotation.IntRange +import com.google.common.util.concurrent.ListenableFuture import io.github.thibaultbee.streampack.core.elements.interfaces.Releasable import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.ISurfaceOutput import io.github.thibaultbee.streampack.core.elements.utils.av.video.DynamicRangeProfile @@ -45,6 +48,8 @@ interface ISurfaceProcessorInternal : ISurfaceProcessor, Releasable { fun removeAllOutputSurfaces() + fun snapshot(@IntRange(from = 0, to = 359) rotationDegrees: Int): ListenableFuture + /** * Factory interface for creating instances of [ISurfaceProcessorInternal]. */ @@ -54,4 +59,4 @@ interface ISurfaceProcessorInternal : ISurfaceProcessor, Releasable { dispatcherProvider: IVideoDispatcherProvider ): ISurfaceProcessorInternal } -} \ No newline at end of file +} diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt index e4e764f74..1969a2f74 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt @@ -16,7 +16,9 @@ package io.github.thibaultbee.streampack.core.pipelines.inputs import android.content.Context +import android.graphics.Bitmap import android.view.Surface +import androidx.annotation.IntRange import io.github.thibaultbee.streampack.core.elements.processing.video.ISurfaceProcessorInternal import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.ISurfaceOutput import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider @@ -40,7 +42,12 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext +import java.io.File +import java.io.FileOutputStream +import java.io.OutputStream import java.util.concurrent.atomic.AtomicBoolean +import kotlin.coroutines.resume +import kotlin.coroutines.suspendCoroutine /** * The public interface for the video input. @@ -75,6 +82,64 @@ interface IVideoInput { * The video processor for adding effects to the video frames. */ val processor: ISurfaceProcessorInternal + + /** + * Takes a snapshot of the current video frame. + * + * The snapshot is returned as a [Bitmap]. + * + * @param rotationDegrees The rotation to apply to the snapshot, in degrees. 0 means no rotation. + * @return The snapshot as a [Bitmap]. + */ + suspend fun takeSnapshot(@IntRange(from = 0, to = 359) rotationDegrees: Int = 0): Bitmap +} + +/** + * Takes a JPEG snapshot of the current video frame. + * + * The snapshot is saved to the specified file. + * + * @param filePathString The path of the file to save the snapshot to. + * @param quality The quality of the JPEG, from 0 to 100. + * @param rotationDegrees The rotation to apply to the snapshot, in degrees. + */ +suspend fun IVideoInput.takeJpegSnapshot( + filePathString: String, + @IntRange(from = 0, to = 100) quality: Int = 100, + @IntRange(from = 0, to = 359) rotationDegrees: Int = 0 +) = takeJpegSnapshot(FileOutputStream(filePathString), quality, rotationDegrees) + + +/** + * Takes a JPEG snapshot of the current video frame. + * + * The snapshot is saved to the specified file. + * + * @param file The file to save the snapshot to. + * @param quality The quality of the JPEG, from 0 to 100. + * @param rotationDegrees The rotation to apply to the snapshot, in degrees. + */ +suspend fun IVideoInput.takeJpegSnapshot( + file: File, + @IntRange(from = 0, to = 100) quality: Int = 100, + @IntRange(from = 0, to = 359) rotationDegrees: Int = 0 +) = takeJpegSnapshot(FileOutputStream(file), quality, rotationDegrees) + +/** + * Takes a snapshot of the current video frame. + * + * The snapshot is saved as a JPEG to the specified output stream. + * @param outputStream The output stream to save the snapshot to. + * @param quality The quality of the JPEG, from 0 to 100. + * @param rotationDegrees The rotation to apply to the snapshot, in degrees. + */ +suspend fun IVideoInput.takeJpegSnapshot( + outputStream: OutputStream, + @IntRange(from = 0, to = 100) quality: Int = 100, + @IntRange(from = 0, to = 359) rotationDegrees: Int = 0 +) { + val bitmap = takeSnapshot(rotationDegrees) + bitmap.compress(Bitmap.CompressFormat.JPEG, quality, outputStream) } /** @@ -98,7 +163,7 @@ internal class VideoInput( private var isReleaseRequested = AtomicBoolean(false) - private val sourceMutex = Mutex() + private val videoSourceMutex = Mutex() override var processor: ISurfaceProcessorInternal = surfaceProcessorFactory.create(dynamicRangeProfileHint, dispatcherProvider) @@ -150,7 +215,7 @@ internal class VideoInput( } withContext(dispatcherProvider.default) { - sourceMutex.withLock { + videoSourceMutex.withLock { val previousVideoSource = sourceInternalFlow.value val isStreaming = previousVideoSource?.isStreamingFlow?.value ?: false @@ -261,13 +326,13 @@ internal class VideoInput( } - internal suspend fun setSourceConfig(newVideoSourceConfig: VideoSourceConfig) { + suspend fun setSourceConfig(newVideoSourceConfig: VideoSourceConfig) { if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } withContext(dispatcherProvider.default) { - sourceMutex.withLock { + videoSourceMutex.withLock { if (sourceConfig == newVideoSourceConfig) { Logger.i(TAG, "Video source configuration is the same, skipping configuration") return@withContext @@ -349,7 +414,24 @@ internal class VideoInput( return newSurfaceProcessor } - internal suspend fun addOutputSurface(output: ISurfaceOutput) { + override suspend fun takeSnapshot(@IntRange(from = 0, to = 359) rotationDegrees: Int): Bitmap { + if (isReleaseRequested.get()) { + throw IllegalStateException("Input is released") + } + return withContext(dispatcherProvider.default) { + suspendCoroutine { continuation -> + val listener = processor.snapshot(rotationDegrees) + try { + val bitmap = listener.get() + continuation.resume(bitmap) + } catch (e: Exception) { + continuation.resumeWith(Result.failure(e)) + } + } + } + } + + suspend fun addOutputSurface(output: ISurfaceOutput) { if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } @@ -360,7 +442,7 @@ internal class VideoInput( } } - internal suspend fun removeOutputSurface(output: Surface) { + suspend fun removeOutputSurface(output: Surface) { outputMutex.withLock { surfaceOutput.firstOrNull { it.descriptor.surface == output }?.let { surfaceOutput.remove(it) @@ -369,12 +451,12 @@ internal class VideoInput( } } - internal suspend fun startStream() { + suspend fun startStream() { if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } withContext(dispatcherProvider.default) { - sourceMutex.withLock { + videoSourceMutex.withLock { val source = requireNotNull(source) { "Video source must be set before starting stream" } if (isStreamingFlow.value) { @@ -390,12 +472,12 @@ internal class VideoInput( } } - internal suspend fun stopStream() { + suspend fun stopStream() { if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } withContext(dispatcherProvider.default) { - sourceMutex.withLock { + videoSourceMutex.withLock { _isStreamingFlow.emit(false) try { source?.stopStream() @@ -421,14 +503,14 @@ internal class VideoInput( processor.release() } - internal suspend fun release() { + suspend fun release() { if (isReleaseRequested.getAndSet(true)) { Logger.w(TAG, "Already released") return } withContext(dispatcherProvider.default) { - sourceMutex.withLock { + videoSourceMutex.withLock { _isStreamingFlow.emit(false) try { releaseSurfaceProcessor() From 1f5dc914e3f4f63bef7ae208932bcda2dd7d2677 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Thu, 9 Oct 2025 15:31:05 +0200 Subject: [PATCH 2/3] refactor(core): pipeline: call isReleaseRequested in the calling scope --- .../core/pipelines/StreamerPipeline.kt | 200 +++++++++++------- 1 file changed, 118 insertions(+), 82 deletions(-) diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt index c190acc48..87b795309 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/StreamerPipeline.kt @@ -398,23 +398,23 @@ open class StreamerPipeline( withVideo: Boolean = this.withVideo, endpointFactory: IEndpointInternal.Factory = DynamicEndpointFactory(), @RotationValue targetRotation: Int = context.displayRotation - ): IConfigurableAudioVideoEncodingPipelineOutput = - withContext(dispatcherProvider.default) { - if (isReleaseRequested.get()) { - throw IllegalStateException("Pipeline is released") - } - require(withAudio || withVideo) { "At least one of audio or video must be set" } - val withAudioCorrected = if (this@StreamerPipeline.withAudio) { - withAudio - } else { - false - } - val withVideoCorrected = if (this@StreamerPipeline.withVideo) { - withVideo - } else { - false - } + ): IConfigurableAudioVideoEncodingPipelineOutput { + if (isReleaseRequested.get()) { + throw IllegalStateException("Pipeline is released") + } + require(withAudio || withVideo) { "At least one of audio or video must be set" } + val withAudioCorrected = if (this@StreamerPipeline.withAudio) { + withAudio + } else { + false + } + val withVideoCorrected = if (this@StreamerPipeline.withVideo) { + withVideo + } else { + false + } + return withContext(dispatcherProvider.default) { val output = EncodingPipelineOutput( context, @@ -426,6 +426,7 @@ open class StreamerPipeline( ) addOutput(output) } + } /** * Adds an output. @@ -443,26 +444,32 @@ open class StreamerPipeline( "Output must be an audio or video output" } - if (safeOutputCall { outputs -> outputs.contains(output) }) { - throw IllegalStateException("Output $output already added") - } - require(!output.isStreaming) { "Output $output is already streaming" } - - try { - val jobs = addOutputImpl(output, coroutineScope) - safeOutputCall { - outputsToJobsMap[output] = jobs + return withContext(dispatcherProvider.default) { + if (safeOutputCall { outputs -> outputs.contains(output) }) { + throw IllegalStateException("Output $output already added") } - } catch (t: Throwable) { - removeOutput(output) + require(!output.isStreaming) { "Output $output is already streaming" } + try { - output.release() - } catch (t2: Throwable) { - Logger.e(TAG, "Error while releasing output $output after a failure to add it", t2) + val jobs = addOutputImpl(output, coroutineScope) + safeOutputCall { + outputsToJobsMap[output] = jobs + } + } catch (t: Throwable) { + removeOutput(output) + try { + output.release() + } catch (t2: Throwable) { + Logger.e( + TAG, + "Error while releasing output $output after a failure to add it", + t2 + ) + } + throw t } - throw t + output } - return output } /** @@ -802,27 +809,32 @@ open class StreamerPipeline( throw IllegalStateException("Pipeline is released") } - val jobs = mutableListOf() - val exceptions = mutableListOf() - safeOutputCall { outputs -> - outputs.keys.forEach { output -> - jobs += coroutineScope.launch { - try { - output.startStream() - } catch (t: Throwable) { - exceptions += t - Logger.w(TAG, "startStream: Can't start output $output: ${t.message}") + withContext(dispatcherProvider.default) { + val jobs = mutableListOf() + val exceptions = mutableListOf() + safeOutputCall { outputs -> + outputs.keys.forEach { output -> + jobs += coroutineScope.launch { + try { + output.startStream() + } catch (t: Throwable) { + exceptions += t + Logger.w( + TAG, + "startStream: Can't start output $output: ${t.message}" + ) + } } } } - } - jobs.joinAll() + jobs.joinAll() - if (exceptions.isNotEmpty()) { - if (exceptions.size == 1) { - throw exceptions.first() - } else { - throw MultiThrowable(exceptions) + if (exceptions.isNotEmpty()) { + if (exceptions.size == 1) { + throw exceptions.first() + } else { + throw MultiThrowable(exceptions) + } } } } @@ -852,8 +864,10 @@ open class StreamerPipeline( */ private suspend fun stopStreamInputsIfNeededUnsafe(output: IPipelineOutput) { // If sources are not streaming, do nothing - var isAudioSourceStreaming = _audioInput != null && _audioInput.isStreamingFlow.value - var isVideoSourceStreaming = _videoInput != null && _videoInput.isStreamingFlow.value + var isAudioSourceStreaming = + _audioInput != null && _audioInput.isStreamingFlow.value + var isVideoSourceStreaming = + _videoInput != null && _videoInput.isStreamingFlow.value if (!isAudioSourceStreaming && !isVideoSourceStreaming) { return } @@ -866,7 +880,10 @@ open class StreamerPipeline( try { it.stopStream() } catch (t: Throwable) { - Logger.w(TAG, "stopStream: Can't stop audio input: ${t.message}") + Logger.w( + TAG, + "stopStream: Can't stop audio input: ${t.message}" + ) } } } @@ -880,7 +897,10 @@ open class StreamerPipeline( try { it.stopStream() } catch (t: Throwable) { - Logger.w(TAG, "stopStream: Can't stop video input: ${t.message}") + Logger.w( + TAG, + "stopStream: Can't stop video input: ${t.message}" + ) } } } @@ -890,8 +910,10 @@ open class StreamerPipeline( videoJob?.join() // set isStreamingFlow to false if no more inputs are streaming - isAudioSourceStreaming = _audioInput != null && _audioInput.isStreamingFlow.value - isVideoSourceStreaming = _videoInput != null && _videoInput.isStreamingFlow.value + isAudioSourceStreaming = + _audioInput != null && _audioInput.isStreamingFlow.value + isVideoSourceStreaming = + _videoInput != null && _videoInput.isStreamingFlow.value if (!isAudioSourceStreaming && !isVideoSourceStreaming) { _isStreamingFlow.emit(false) } @@ -924,16 +946,18 @@ open class StreamerPipeline( * * It stops audio and video sources and calls [IPipelineOutput.stopStream] on all outputs. */ - override suspend fun stopStream() = withContext(dispatcherProvider.default) { + override suspend fun stopStream() { if (isReleaseRequested.get()) { throw IllegalStateException("Pipeline is released") } + withContext(dispatcherProvider.default) { - inputMutex.withLock { - stopStreamInputsUnsafe() - } + inputMutex.withLock { + stopStreamInputsUnsafe() + } - stopStreamOutputs() + stopStreamOutputs() + } } private suspend fun releaseSourcesUnsafe() { @@ -957,12 +981,18 @@ open class StreamerPipeline( try { detachOutput(output) } catch (t: Throwable) { - Logger.w(TAG, "release: Can't detach output $output: ${t.message}") + Logger.w( + TAG, + "release: Can't detach output $output: ${t.message}" + ) } try { output.release() } catch (t: Throwable) { - Logger.w(TAG, "release: Can't release output $output: ${t.message}") + Logger.w( + TAG, + "release: Can't release output $output: ${t.message}" + ) } } outputs.clear() @@ -975,27 +1005,31 @@ open class StreamerPipeline( * It releases the audio and video sources and the processors. * It also calls [IPipelineOutput.release] on all outputs. */ - override suspend fun release() = withContext(dispatcherProvider.default) { + override suspend fun release() { if (isReleaseRequested.getAndSet(true)) { Logger.w(TAG, "Already released") - return@withContext + return } Logger.d(TAG, "Releasing pipeline") - - // Sources - inputMutex.withLock { - try { - releaseSourcesUnsafe() - } catch (t: Throwable) { - Logger.w(TAG, "release: Can't release sources: ${t.message}") + withContext(dispatcherProvider.default) { + // Sources + inputMutex.withLock { + try { + releaseSourcesUnsafe() + } catch (t: Throwable) { + Logger.w( + TAG, + "release: Can't release sources: ${t.message}" + ) + } } - } - Logger.d(TAG, "Sources released") + Logger.d(TAG, "Sources released") - // Outputs - releaseOutputs() + // Outputs + releaseOutputs() - coroutineScope.cancel() + coroutineScope.cancel() + } } private suspend fun safeOutputCall(block: suspend (MutableMap>) -> T) = @@ -1007,22 +1041,24 @@ open class StreamerPipeline( private suspend fun safeStreamingOutputCall(block: suspend (Map>) -> T) = safeOutputCall { outputs -> - val streamingOutputs = outputs.filter { it.key.isStreamingFlow.value } + val streamingOutputs = + outputs.filter { it.key.isStreamingFlow.value } block(streamingOutputs) } /** * Executes a block with the [coroutineDispatcher] and the [inputMutex] locked. */ - private suspend fun withContextInputMutex(block: suspend () -> T): T = - withContext(dispatcherProvider.default) { - if (isReleaseRequested.get()) { - throw IllegalStateException("Pipeline is released") - } + private suspend fun withContextInputMutex(block: suspend () -> T): T { + if (isReleaseRequested.get()) { + throw IllegalStateException("Pipeline is released") + } + return withContext(dispatcherProvider.default) { inputMutex.withLock { block() } } + } companion object { private const val TAG = "StreamerPipeline" From 890b098892d6b3cd55bcbb9cd347c202f8ce4790 Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Thu, 9 Oct 2025 15:43:04 +0200 Subject: [PATCH 3/3] feat(core): startStream to take a snapshot if needed --- .../video/DefaultSurfaceProcessor.kt | 67 +++++++------ .../video/outputs/ISurfaceOutput.kt | 15 ++- .../video/outputs/IdentitySurfaceOutput.kt | 40 -------- .../processing/video/outputs/SurfaceOutput.kt | 26 ++++- .../core/pipelines/inputs/AudioInput.kt | 44 +++++++-- .../core/pipelines/inputs/VideoInput.kt | 95 ++++++++++++++----- 6 files changed, 171 insertions(+), 116 deletions(-) delete mode 100644 core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/IdentitySurfaceOutput.kt diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt index 2e9556fb6..3fa9d43d6 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/DefaultSurfaceProcessor.kt @@ -24,6 +24,7 @@ import androidx.annotation.IntRange import androidx.concurrent.futures.CallbackToFutureAdapter import com.google.common.util.concurrent.ListenableFuture import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.ISurfaceOutput +import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.SurfaceOutput import io.github.thibaultbee.streampack.core.elements.processing.video.utils.GLUtils import io.github.thibaultbee.streampack.core.elements.processing.video.utils.extensions.preRotate import io.github.thibaultbee.streampack.core.elements.processing.video.utils.extensions.preVerticalFlip @@ -33,7 +34,6 @@ import io.github.thibaultbee.streampack.core.logger.Logger import io.github.thibaultbee.streampack.core.pipelines.DispatcherProvider.Companion.THREAD_NAME_GL import io.github.thibaultbee.streampack.core.pipelines.IVideoDispatcherProvider import io.github.thibaultbee.streampack.core.pipelines.utils.HandlerThreadExecutor -import java.io.IOException import java.util.concurrent.atomic.AtomicBoolean @@ -118,7 +118,7 @@ private class DefaultSurfaceProcessor( executeSafely { if (!surfaceOutputs.contains(surfaceOutput)) { - renderer.registerOutputSurface(surfaceOutput.descriptor.surface) + renderer.registerOutputSurface(surfaceOutput.surface) surfaceOutputs.add(surfaceOutput) } else { Logger.w(TAG, "Surface already added") @@ -128,7 +128,7 @@ private class DefaultSurfaceProcessor( private fun removeOutputSurfaceInternal(surfaceOutput: ISurfaceOutput) { if (surfaceOutputs.contains(surfaceOutput)) { - renderer.unregisterOutputSurface(surfaceOutput.descriptor.surface) + renderer.unregisterOutputSurface(surfaceOutput.surface) surfaceOutputs.remove(surfaceOutput) } else { Logger.w(TAG, "Surface not found") @@ -152,7 +152,7 @@ private class DefaultSurfaceProcessor( executeSafely { val surfaceOutput = - surfaceOutputs.firstOrNull { it.descriptor.surface == surface } + surfaceOutputs.firstOrNull { it.surface == surface } if (surfaceOutput != null) { removeOutputSurfaceInternal(surfaceOutput) } else { @@ -163,7 +163,7 @@ private class DefaultSurfaceProcessor( private fun removeAllOutputSurfacesInternal() { surfaceOutputs.forEach { surfaceOutput -> - renderer.unregisterOutputSurface(surfaceOutput.descriptor.surface) + renderer.unregisterOutputSurface(surfaceOutput.surface) } surfaceOutputs.clear() } @@ -225,15 +225,17 @@ private class DefaultSurfaceProcessor( val timestamp = surfaceTexture.timestamp + (surfaceInputsTimestampInNsMap[surfaceTexture] ?: 0L) - surfaceOutputs.filter { it.isStreaming() }.forEach { + surfaceOutputs.filterIsInstance().forEach { try { it.updateTransformMatrix(surfaceOutputMatrix, textureMatrix) - renderer.render( - timestamp, - surfaceOutputMatrix, - it.descriptor.surface, - it.viewportRect - ) + if (it.isStreaming()) { + renderer.render( + timestamp, + surfaceOutputMatrix, + it.surface, + it.viewportRect + ) + } } catch (t: Throwable) { Logger.e(TAG, "Error while rendering frame", t) } @@ -242,14 +244,14 @@ private class DefaultSurfaceProcessor( // Surface, size and transform matrix for JPEG Surface if exists if (pendingSnapshots.isNotEmpty()) { try { - val first = surfaceOutputs.first() - val snapshotOutput = Pair( - first.descriptor.resolution, - surfaceOutputMatrix.clone() - ) + val bitmapSurface = + surfaceOutputs.maxByOrNull { it.resolution.width * it.resolution.height } + ?: throw IllegalStateException( + "No output surface available for snapshot" + ) // Execute all pending snapshots. - takeSnapshot(snapshotOutput) + takeSnapshot(bitmapSurface.resolution, surfaceOutputMatrix.clone()) } catch (e: RuntimeException) { // Propagates error back to the app if failed to take snapshot. failAllPendingSnapshots(e) @@ -260,28 +262,32 @@ private class DefaultSurfaceProcessor( /** * Takes a snapshot of the current frame and draws it to given JPEG surface. * - * @param snapshotOutput The pair for drawing. + * @param snapshotSize The size of the snapshot. + * @param snapshotTransform The GL transform matrix to apply to the snapshot. */ - private fun takeSnapshot(snapshotOutput: Pair) { + private fun takeSnapshot(snapshotSize: Size, snapshotTransform: FloatArray) { if (pendingSnapshots.isEmpty()) { // No pending snapshot requests, do nothing. return } - // Write to JPEG surface, once for each snapshot request. + // Write to Bitmap, once for each snapshot request. try { for (pendingSnapshot in pendingSnapshots) { - val (size, transform) = snapshotOutput - - // Take a snapshot of the current frame. - val bitmap = getBitmap(size, transform, pendingSnapshot.rotationDegrees) - - // Complete the snapshot request. - pendingSnapshot.completer.set(bitmap) + try { + // Take a snapshot of the current frame. + val bitmap = + getBitmap(snapshotSize, snapshotTransform, pendingSnapshot.rotationDegrees) + + // Complete the snapshot request. + pendingSnapshot.completer.set(bitmap) + } catch (t: Throwable) { + // Propagate error back to the app if failed to take snapshot. + pendingSnapshot.completer.setException(t) + } } + } finally { pendingSnapshots.clear() - } catch (e: IOException) { - failAllPendingSnapshots(e) } } @@ -289,7 +295,6 @@ private class DefaultSurfaceProcessor( for (pendingSnapshot in pendingSnapshots) { pendingSnapshot.completer.setException(throwable) } - pendingSnapshots.clear() } private fun getBitmap( diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/ISurfaceOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/ISurfaceOutput.kt index 3d4e7bed5..2f1f9c468 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/ISurfaceOutput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/ISurfaceOutput.kt @@ -1,12 +1,17 @@ package io.github.thibaultbee.streampack.core.elements.processing.video.outputs -import android.graphics.Rect -import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor +import android.util.Size +import android.view.Surface interface ISurfaceOutput { - val descriptor: SurfaceDescriptor - val isStreaming: () -> Boolean - val viewportRect: Rect + val surface: Surface + val resolution: Size + val type: OutputType fun updateTransformMatrix(output: FloatArray, input: FloatArray) + + enum class OutputType { + INTERNAL, + BITMAP + } } \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/IdentitySurfaceOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/IdentitySurfaceOutput.kt deleted file mode 100644 index efc1d3e95..000000000 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/IdentitySurfaceOutput.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) 2024 Thibault B. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.github.thibaultbee.streampack.core.elements.processing.video.outputs - -import android.graphics.Rect -import android.opengl.Matrix -import io.github.thibaultbee.streampack.core.elements.processing.video.utils.GLUtils -import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor - - -class IdentitySurfaceOutput( - override val descriptor: SurfaceDescriptor, - override val isStreaming: () -> Boolean, - override val viewportRect: Rect = Rect( - 0, - 0, - descriptor.resolution.width, - descriptor.resolution.height - ) -) : ISurfaceOutput { - override fun updateTransformMatrix(output: FloatArray, input: FloatArray) { - Matrix.multiplyMM( - output, 0, input, 0, GLUtils.IDENTITY_MATRIX, 0 - ) - } -} - diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt index 73e89b4c5..0058ec234 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/elements/processing/video/outputs/SurfaceOutput.kt @@ -18,7 +18,10 @@ package io.github.thibaultbee.streampack.core.elements.processing.video.outputs import android.graphics.Rect import android.graphics.RectF import android.opengl.Matrix +import android.util.Size +import android.view.Surface import androidx.annotation.IntRange +import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.SurfaceOutput.TransformationInfo import io.github.thibaultbee.streampack.core.elements.processing.video.outputs.ViewPortUtils.calculateViewportRect import io.github.thibaultbee.streampack.core.elements.processing.video.source.ISourceInfoProvider import io.github.thibaultbee.streampack.core.elements.processing.video.utils.TransformUtils @@ -29,14 +32,27 @@ import io.github.thibaultbee.streampack.core.elements.utils.RotationValue import io.github.thibaultbee.streampack.core.elements.utils.extensions.rotate import io.github.thibaultbee.streampack.core.pipelines.outputs.SurfaceDescriptor +fun SurfaceOutput( + descriptor: SurfaceDescriptor, + isStreaming: () -> Boolean, + transformationInfo: TransformationInfo +) = + SurfaceOutput( + descriptor.surface, + descriptor.resolution, + isStreaming, + transformationInfo + ) + class SurfaceOutput( - override val descriptor: SurfaceDescriptor, - override val isStreaming: () -> Boolean, + override val surface: Surface, + override val resolution: Size, + val isStreaming: () -> Boolean, private val transformationInfo: TransformationInfo ) : ISurfaceOutput { - private val resolution = descriptor.resolution - + override val type = ISurfaceOutput.OutputType.INTERNAL + private val infoProvider: ISourceInfoProvider get() = transformationInfo.infoProvider @@ -52,7 +68,7 @@ class SurfaceOutput( private val additionalTransform = FloatArray(16) private val invertedTextureTransform = FloatArray(16) - override val viewportRect = calculateViewportRect( + val viewportRect = calculateViewportRect( transformationInfo.aspectRatioMode, transformationInfo.infoProvider.getSurfaceSize(resolution), resolution diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt index 96e051cb5..b0edd5bae 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/AudioInput.kt @@ -233,7 +233,10 @@ internal class AudioInput( withContext(dispatcherProvider.default) { sourceMutex.withLock { if (sourceConfig == newAudioSourceConfig) { - Logger.i(TAG, "Audio source configuration is the same, skipping configuration") + Logger.i( + TAG, + "Audio source configuration is the same, skipping configuration" + ) return@withContext } require(!isStreamingFlow.value) { "Can't change audio source configuration while streaming" } @@ -285,7 +288,10 @@ internal class AudioInput( try { port.startStream() } catch (t: Throwable) { - Logger.w(TAG, "startStream: Can't start audio processor: ${t.message}") + Logger.w( + TAG, + "startStream: Can't start audio processor: ${t.message}" + ) source.stopStream() throw t } @@ -305,12 +311,18 @@ internal class AudioInput( try { port.stopStream() } catch (t: Throwable) { - Logger.w(TAG, "stopStream: Can't stop audio processor: ${t.message}") + Logger.w( + TAG, + "stopStream: Can't stop audio processor: ${t.message}" + ) } try { sourceInternalFlow.value?.stopStream() } catch (t: Throwable) { - Logger.w(TAG, "stopStream: Can't stop audio source: ${t.message}") + Logger.w( + TAG, + "stopStream: Can't stop audio source: ${t.message}" + ) } } } @@ -328,17 +340,26 @@ internal class AudioInput( try { port.removeInput() } catch (t: Throwable) { - Logger.w(TAG, "release: Can't remove audio processor input: ${t.message}") + Logger.w( + TAG, + "release: Can't remove audio processor input: ${t.message}" + ) } try { port.release() } catch (t: Throwable) { - Logger.w(TAG, "release: Can't release audio processor: ${t.message}") + Logger.w( + TAG, + "release: Can't release audio processor: ${t.message}" + ) } try { sourceInternalFlow.value?.release() } catch (t: Throwable) { - Logger.w(TAG, "release: Can't release audio source: ${t.message}") + Logger.w( + TAG, + "release: Can't release audio source: ${t.message}" + ) } isStreamingJob.cancel() @@ -353,7 +374,9 @@ internal class AudioInput( internal sealed class Config - internal class PushConfig(val onFrame: suspend (RawFrame) -> Unit) : Config() + internal class PushConfig(val onFrame: suspend (RawFrame) -> Unit) : + Config() + internal class CallbackConfig : Config() } @@ -370,7 +393,10 @@ private class PushAudioPort( private val audioPullPush = RawFramePullPush( audioFrameProcessor, config.onFrame, - dispatcherProvider.createAudioDispatcher(1, THREAD_NAME_AUDIO_PREPROCESSING) + dispatcherProvider.createAudioDispatcher( + 1, + THREAD_NAME_AUDIO_PREPROCESSING + ) ) override suspend fun setInput(getFrame: (frameFactory: IRawFrameFactory) -> RawFrame) { diff --git a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt index 1969a2f74..a4f3dcc03 100644 --- a/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt +++ b/core/src/main/java/io/github/thibaultbee/streampack/core/pipelines/inputs/VideoInput.kt @@ -163,7 +163,7 @@ internal class VideoInput( private var isReleaseRequested = AtomicBoolean(false) - private val videoSourceMutex = Mutex() + private val sourceMutex = Mutex() override var processor: ISurfaceProcessorInternal = surfaceProcessorFactory.create(dynamicRangeProfileHint, dispatcherProvider) @@ -215,7 +215,7 @@ internal class VideoInput( } withContext(dispatcherProvider.default) { - videoSourceMutex.withLock { + sourceMutex.withLock { val previousVideoSource = sourceInternalFlow.value val isStreaming = previousVideoSource?.isStreamingFlow?.value ?: false @@ -332,9 +332,12 @@ internal class VideoInput( } withContext(dispatcherProvider.default) { - videoSourceMutex.withLock { + sourceMutex.withLock { if (sourceConfig == newVideoSourceConfig) { - Logger.i(TAG, "Video source configuration is the same, skipping configuration") + Logger.i( + TAG, + "Video source configuration is the same, skipping configuration" + ) return@withContext } require(!isStreamingFlow.value) { "Can't change video source configuration while streaming" } @@ -418,7 +421,7 @@ internal class VideoInput( if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } - return withContext(dispatcherProvider.default) { + return startStreamForBlock { suspendCoroutine { continuation -> val listener = processor.snapshot(rotationDegrees) try { @@ -444,46 +447,80 @@ internal class VideoInput( suspend fun removeOutputSurface(output: Surface) { outputMutex.withLock { - surfaceOutput.firstOrNull { it.descriptor.surface == output }?.let { + surfaceOutput.firstOrNull { it.surface == output }?.let { surfaceOutput.remove(it) } processor.removeOutputSurface(output) } } - suspend fun startStream() { + private suspend fun startStreamUnsafe() { + val source = + requireNotNull(source) { "Video source must be set before starting stream" } + if (isStreamingFlow.value) { + Logger.w(TAG, "Stream is already running") + return + } + if (!withConfig) { + Logger.w(TAG, "Video source config is not set") + } + source.startStream() + } + + /** + * Starts the stream, executes the given block, then stops the stream. + * + * If the stream was already running, it will not be stopped after the block. + */ + private suspend fun startStreamForBlock(block: suspend () -> T): T { if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } - withContext(dispatcherProvider.default) { - videoSourceMutex.withLock { - val source = - requireNotNull(source) { "Video source must be set before starting stream" } - if (isStreamingFlow.value) { - Logger.w(TAG, "Stream is already running") - return@withContext + return withContext(dispatcherProvider.default) { + sourceMutex.withLock { + val wasStreaming = isStreamingFlow.value + if (!wasStreaming) { + startStreamUnsafe() } - if (!withConfig) { - Logger.w(TAG, "Video source config is not set") + try { + block() + } finally { + if (!wasStreaming) { + stopStreamUnsafe() + } } - source.startStream() + } + } + } + + suspend fun startStream() { + if (isReleaseRequested.get()) { + throw IllegalStateException("Input is released") + } + withContext(dispatcherProvider.default) { + sourceMutex.withLock { + startStreamUnsafe() _isStreamingFlow.emit(true) } } } + private suspend fun stopStreamUnsafe() { + try { + source?.stopStream() + } catch (t: Throwable) { + Logger.w(TAG, "stopStream: Can't stop video source: ${t.message}") + } + } + suspend fun stopStream() { if (isReleaseRequested.get()) { throw IllegalStateException("Input is released") } withContext(dispatcherProvider.default) { - videoSourceMutex.withLock { + sourceMutex.withLock { _isStreamingFlow.emit(false) - try { - source?.stopStream() - } catch (t: Throwable) { - Logger.w(TAG, "stopStream: Can't stop video source: ${t.message}") - } + stopStreamUnsafe() } } } @@ -510,12 +547,15 @@ internal class VideoInput( } withContext(dispatcherProvider.default) { - videoSourceMutex.withLock { + sourceMutex.withLock { _isStreamingFlow.emit(false) try { releaseSurfaceProcessor() } catch (t: Throwable) { - Logger.w(TAG, "release: Can't release surface processor: ${t.message}") + Logger.w( + TAG, + "release: Can't release surface processor: ${t.message}" + ) } val videoSource = sourceInternalFlow.value if (videoSource is ISurfaceSourceInternal) { @@ -531,7 +571,10 @@ internal class VideoInput( try { videoSource?.release() } catch (t: Throwable) { - Logger.w(TAG, "release: Can't release video source: ${t.message}") + Logger.w( + TAG, + "release: Can't release video source: ${t.message}" + ) } isStreamingJob.cancel()