Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ dependencies {
testImplementation(libs.robolectric)

androidTestImplementation(project(":streampack-srt"))
androidTestImplementation(project(":streampack-flv"))
androidTestImplementation(project(":streampack-rtmp"))

androidTestImplementation(libs.androidx.test.core.ktx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow

class DummyEndpoint : IEndpointInternal {
Expand Down Expand Up @@ -54,6 +55,7 @@ class DummyEndpoint : IEndpointInternal {

override val metrics: Any
get() = TODO("Not yet implemented")
override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()

override suspend fun open(descriptor: MediaDescriptor) {
_isOpenFlow.emit(true)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.github.thibaultbee.streampack.core.elements.endpoints

import androidx.test.platform.app.InstrumentationRegistry
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.flv.FlvMuxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.FlvFileEndpoint
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.test.runTest
import org.junit.Test
Expand Down Expand Up @@ -32,12 +30,13 @@ class EndpointStateTest(private val endpoint: IEndpointInternal) {
)
fun getMediaDescriptor(): Iterable<IEndpointInternal> {
val context = InstrumentationRegistry.getInstrumentation().context
val defaultDispatcher = Dispatchers.Default
val ioDispatcher = Dispatchers.IO

return arrayListOf(
DynamicEndpoint(context, Dispatchers.Default, ioDispatcher),
DynamicEndpoint(context, defaultDispatcher, ioDispatcher),
MediaMuxerEndpoint(context, ioDispatcher),
CompositeEndpoint(FlvMuxer(isForFile = false), FileSink(ioDispatcher))
FlvFileEndpoint(defaultDispatcher, ioDispatcher)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks

import androidx.test.platform.app.InstrumentationRegistry
import io.github.thibaultbee.streampack.ext.rtmp.elements.endpoints.composites.sinks.RtmpSink
import io.github.thibaultbee.streampack.ext.srt.elements.endpoints.composites.sinks.SrtSink
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.test.runTest
Expand Down Expand Up @@ -33,8 +32,7 @@ class SinkStateTest(private val endpoint: ISinkInternal) {
ContentSink(context, ioDispatcher),
ChunkedFileOutputStreamSink(1000, ioDispatcher),
FakeSink(),
SrtSink(ioDispatcher),
RtmpSink(ioDispatcher)
SrtSink(ioDispatcher)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ data class Frame(
/**
* Presentation timestamp in µs
*/
var ptsInUs: Long,
val ptsInUs: Long,

/**
* Decoded timestamp in µs (not used).
*/
var dtsInUs: Long?,
val dtsInUs: Long? = null,

/**
* [Boolean.true] if frame is a key frame (I-frame for AVC/HEVC and audio frames)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class AudioCodecConfig(
48000
)

internal fun isAacMimeType(mimeType: String) = mimeType.startsWith(MIMETYPE_AAC_PREFIX)
fun isAacMimeType(mimeType: String) = mimeType.startsWith(MIMETYPE_AAC_PREFIX)

private fun getAacProfileFromMimeType(mimeType: String) = when (mimeType) {
MediaFormat.MIMETYPE_AUDIO_AAC -> MediaCodecInfo.CodecProfileLevel.AACObjectLC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.combineTransform
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch

Expand Down Expand Up @@ -91,6 +92,14 @@ open class CombineEndpoint(
false
)

override val throwableFlow: StateFlow<Throwable?> = merge(
*endpointInternals.map { it.throwableFlow }.toTypedArray()
).stateIn(
coroutineScope,
started = SharingStarted.Eagerly,
initialValue = null
)

/**
* The union of all endpoints' [IEndpoint.IEndpointInfo].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.creat
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoints
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.flv.FlvMuxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.TsMuxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink
Expand All @@ -33,9 +31,11 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
Expand Down Expand Up @@ -72,6 +72,13 @@ open class DynamicEndpoint(
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow: StateFlow<Boolean> = _isOpenFlow.asStateFlow()

private val throwableFlows = endpointFlow.map { it?.throwableFlow }
override val throwableFlow: StateFlow<Throwable?> = throwableFlows.map { it?.value }.stateIn(
coroutineScope,
started = SharingStarted.Eagerly,
initialValue = null
)

/**
* Only available when the endpoint is opened.
*/
Expand Down Expand Up @@ -197,22 +204,15 @@ open class DynamicEndpoint(

private fun getFlvFileEndpoint(): IEndpointInternal {
if (flvFileEndpoint == null) {
flvFileEndpoint = CompositeEndpoint(
FlvMuxer(
isForFile = true
), FileSink(ioDispatcher)
)
flvFileEndpoint = Endpoints.createFlvFileEndpoint(defaultDispatcher, ioDispatcher)
}
return flvFileEndpoint!!
}

private fun getFlvContentEndpoint(): IEndpointInternal {
if (flvContentEndpoint == null) {
flvContentEndpoint = CompositeEndpoint(
FlvMuxer(
isForFile = true
), ContentSink(context, ioDispatcher)
)
flvContentEndpoint =
Endpoints.createFlvContentEndpoint(context, defaultDispatcher, ioDispatcher)
}
return flvContentEndpoint!!
}
Expand All @@ -238,14 +238,14 @@ open class DynamicEndpoint(

private fun getSrtEndpoint(): IEndpointInternal {
if (srtEndpoint == null) {
srtEndpoint = CompositeEndpoints.createSrtEndpoint(null, ioDispatcher)
srtEndpoint = Endpoints.createSrtEndpoint(null, ioDispatcher)
}
return srtEndpoint!!
}

private fun getRtmpEndpoint(): IEndpointInternal {
if (rtmpEndpoint == null) {
rtmpEndpoint = CompositeEndpoints.createRtmpEndpoint(ioDispatcher)
rtmpEndpoint = Endpoints.createRtmpEndpoint(defaultDispatcher, ioDispatcher)
}
return rtmpEndpoint!!
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.github.thibaultbee.streampack.core.elements.endpoints

import android.content.Context
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoints
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.TsMuxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers.ts.data.TSServiceInfo
import kotlinx.coroutines.CoroutineDispatcher

object Endpoints {
/**
* Creates an endpoint for RTMP (with a FLV muxer)
*/
internal fun createRtmpEndpoint(
defaultDispatcher: CoroutineDispatcher,
ioDispatcher: CoroutineDispatcher
): IEndpointInternal {
return try {
val clazz =
Class.forName("io.github.thibaultbee.streampack.ext.rtmp.elements.endpoints.RtmpEndpoint")
clazz.getConstructor(CoroutineDispatcher::class.java, CoroutineDispatcher::class.java)
.newInstance(defaultDispatcher, ioDispatcher) as IEndpointInternal
} catch (e: ClassNotFoundException) {
// Expected if the app was built without the RTMP extension.
throw ClassNotFoundException(
"Attempting to stream RTMP stream without depending on the RTMP extension",
e
)
} catch (t: Throwable) {
// The RTMP extension is present, but instantiation failed.
throw RuntimeException("Error instantiating RTMP extension", t)
}
}

/**
* Creates an endpoint for FLV File
*/
internal fun createFlvFileEndpoint(
defaultDispatcher: CoroutineDispatcher,
ioDispatcher: CoroutineDispatcher
): IEndpointInternal {
return try {
val clazz =
Class.forName("io.github.thibaultbee.streampack.ext.flv.elements.endpoints.FlvFileEndpoint")
clazz.getConstructor(CoroutineDispatcher::class.java, CoroutineDispatcher::class.java)
.newInstance(defaultDispatcher, ioDispatcher) as IEndpointInternal
} catch (e: ClassNotFoundException) {
// Expected if the app was built without the FLV extension.
throw ClassNotFoundException(
"Attempting to stream FLV stream without depending on the FLV extension",
e
)
} catch (t: Throwable) {
// The FLV extension is present, but instantiation failed.
throw RuntimeException("Error instantiating FLV file extension", t)
}
}


/**
* Creates an endpoint for FLV File
*/
internal fun createFlvContentEndpoint(
context: Context,
defaultDispatcher: CoroutineDispatcher,
ioDispatcher: CoroutineDispatcher
): IEndpointInternal {
return try {
val clazz =
Class.forName("io.github.thibaultbee.streampack.ext.flv.elements.endpoints.FlvContentEndpoint")
clazz.getConstructor(
Context::class.java,
CoroutineDispatcher::class.java,
CoroutineDispatcher::class.java
)
.newInstance(context, defaultDispatcher, ioDispatcher) as IEndpointInternal
} catch (e: ClassNotFoundException) {
// Expected if the app was built without the FLV extension.
throw ClassNotFoundException(
"Attempting to stream FLV stream without depending on the FLV extension",
e
)
} catch (t: Throwable) {
// The FLV extension is present, but instantiation failed.
throw RuntimeException("Error instantiating FLV content extension", t)
}
}

/**
* Creates an endpoint for SRT (with a TS muxer)
*/
internal fun createSrtEndpoint(
serviceInfo: TSServiceInfo?,
coroutineDispatcher: CoroutineDispatcher
): IEndpointInternal {
val sink = CompositeEndpoints.createSrtSink(coroutineDispatcher)
val muxer = TsMuxer()
if (serviceInfo != null) {
muxer.addService(serviceInfo)
}
return CompositeEndpoint(muxer, sink)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package io.github.thibaultbee.streampack.core.elements.endpoints

import android.content.Context
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
import io.github.thibaultbee.streampack.core.elements.data.Frame
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
import io.github.thibaultbee.streampack.core.elements.interfaces.Releasable
Expand All @@ -30,6 +30,13 @@ import kotlinx.coroutines.runBlocking

interface IEndpointInternal : IEndpoint, SuspendStreamable,
SuspendCloseable, Releasable {

/**
* An asynchronous error that occurred during streaming.
* Synchronous errors must be thrown directly by the functions.
*/
val throwableFlow: StateFlow<Throwable?>

/**
* Opens the endpoint.
* The endpoint must check if the [MediaDescriptor] is supported and if it is not already opened.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import io.github.thibaultbee.streampack.core.logger.Logger
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
Expand Down Expand Up @@ -70,6 +71,8 @@ class MediaMuxerEndpoint(
private val _isOpenFlow = MutableStateFlow(false)
override val isOpenFlow = _isOpenFlow.asStateFlow()

override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()

override suspend fun open(descriptor: MediaDescriptor) = mutex.withLock {
when (state) {
State.PENDING_START, State.STARTED -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxer
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ISinkInternal
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.SinkConfiguration
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand Down Expand Up @@ -66,6 +68,8 @@ class CompositeEndpoint(
override val isOpenFlow: StateFlow<Boolean>
get() = sink.isOpenFlow

override val throwableFlow: StateFlow<Throwable?> = MutableStateFlow(null).asStateFlow()

override suspend fun open(descriptor: MediaDescriptor) {
sink.open(descriptor)
}
Expand Down
Loading
Loading