Skip to content

Commit 196ccbe

Browse files
committed
refactor(core): pass the flv frame to the RTMP client or the file write by a dedicated channel
1 parent f16c063 commit 196ccbe

File tree

7 files changed

+264
-93
lines changed

7 files changed

+264
-93
lines changed

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/EndpointStateTest.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ class EndpointStateTest(private val endpoint: IEndpointInternal) {
3030
)
3131
fun getMediaDescriptor(): Iterable<IEndpointInternal> {
3232
val context = InstrumentationRegistry.getInstrumentation().context
33+
val defaultDispatcher = Dispatchers.Default
3334
val ioDispatcher = Dispatchers.IO
3435

3536
return arrayListOf(
36-
DynamicEndpoint(context, Dispatchers.Default, ioDispatcher),
37+
DynamicEndpoint(context, defaultDispatcher, ioDispatcher),
3738
MediaMuxerEndpoint(context, ioDispatcher),
38-
FlvFileEndpoint(ioDispatcher)
39+
FlvFileEndpoint(defaultDispatcher, ioDispatcher)
3940
)
4041
}
4142
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,15 @@ open class DynamicEndpoint(
213213

214214
private fun getFlvFileEndpoint(): IEndpointInternal {
215215
if (flvFileEndpoint == null) {
216-
flvFileEndpoint = Endpoints.createFlvFileEndpoint(ioDispatcher)
216+
flvFileEndpoint = Endpoints.createFlvFileEndpoint(defaultDispatcher, ioDispatcher)
217217
}
218218
return flvFileEndpoint!!
219219
}
220220

221221
private fun getFlvContentEndpoint(): IEndpointInternal {
222222
if (flvContentEndpoint == null) {
223-
flvContentEndpoint = Endpoints.createFlvContentEndpoint(context, ioDispatcher)
223+
flvContentEndpoint =
224+
Endpoints.createFlvContentEndpoint(context, defaultDispatcher, ioDispatcher)
224225
}
225226
return flvContentEndpoint!!
226227
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/Endpoints.kt

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,15 @@ object Endpoints {
3131
/**
3232
* Creates an endpoint for FLV File
3333
*/
34-
internal fun createFlvFileEndpoint(coroutineDispatcher: CoroutineDispatcher): IEndpointInternal {
34+
internal fun createFlvFileEndpoint(
35+
defaultDispatcher: CoroutineDispatcher,
36+
ioDispatcher: CoroutineDispatcher
37+
): IEndpointInternal {
3538
return try {
3639
val clazz =
3740
Class.forName("io.github.thibaultbee.streampack.ext.flv.elements.endpoints.FlvFileEndpoint")
38-
clazz.getConstructor(CoroutineDispatcher::class.java)
39-
.newInstance(coroutineDispatcher) as IEndpointInternal
41+
clazz.getConstructor(CoroutineDispatcher::class.java, CoroutineDispatcher::class.java)
42+
.newInstance(defaultDispatcher, ioDispatcher) as IEndpointInternal
4043
} catch (e: ClassNotFoundException) {
4144
// Expected if the app was built without the FLV extension.
4245
throw ClassNotFoundException(
@@ -55,13 +58,18 @@ object Endpoints {
5558
*/
5659
internal fun createFlvContentEndpoint(
5760
context: Context,
58-
coroutineDispatcher: CoroutineDispatcher
61+
defaultDispatcher: CoroutineDispatcher,
62+
ioDispatcher: CoroutineDispatcher
5963
): IEndpointInternal {
6064
return try {
6165
val clazz =
6266
Class.forName("io.github.thibaultbee.streampack.ext.flv.elements.endpoints.FlvContentEndpoint")
63-
clazz.getConstructor(Context::class.java, CoroutineDispatcher::class.java)
64-
.newInstance(context, coroutineDispatcher) as IEndpointInternal
67+
clazz.getConstructor(
68+
Context::class.java,
69+
CoroutineDispatcher::class.java,
70+
CoroutineDispatcher::class.java
71+
)
72+
.newInstance(context, defaultDispatcher, ioDispatcher) as IEndpointInternal
6573
} catch (e: ClassNotFoundException) {
6674
// Expected if the app was built without the FLV extension.
6775
throw ClassNotFoundException(
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright (C) 2025 Thibault B.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.github.thibaultbee.streampack.core.elements.utils
17+
18+
import kotlinx.coroutines.CancellationException
19+
import kotlinx.coroutines.DelicateCoroutinesApi
20+
import kotlinx.coroutines.ExperimentalCoroutinesApi
21+
import kotlinx.coroutines.channels.BufferOverflow
22+
import kotlinx.coroutines.channels.Channel
23+
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
24+
import kotlinx.coroutines.channels.ReceiveChannel
25+
import kotlinx.coroutines.channels.consumeEach
26+
import java.io.Closeable
27+
28+
/**
29+
* A channel that sends and receives data along with a close action to be executed when the data is no longer needed.
30+
*
31+
* @param T The type of data to be sent and received.
32+
* @param capacity The capacity of the channel.
33+
* @param onBufferOverflow The behavior when the buffer overflows.
34+
*/
35+
class ChannelWithCloseableData<T>(
36+
capacity: Int = RENDEZVOUS,
37+
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
38+
) : ReceiveChannel<ChannelWithCloseableData.CloseableData<T>> {
39+
private val channel =
40+
Channel<CloseableData<T>>(capacity, onBufferOverflow, onUndeliveredElement = { it.close() })
41+
42+
/**
43+
* Sends data along with a close action to the channel.
44+
*
45+
* @param data The data to be sent.
46+
* @param onClose The action to be executed when the data is no longer needed.
47+
*/
48+
suspend fun send(data: T, onClose: (() -> Unit) = {}) {
49+
channel.send(CloseableData(data, onClose))
50+
}
51+
52+
@DelicateCoroutinesApi
53+
override val isClosedForReceive: Boolean
54+
get() = channel.isClosedForReceive
55+
56+
@ExperimentalCoroutinesApi
57+
override val isEmpty: Boolean
58+
get() = channel.isEmpty
59+
override val onReceive = channel.onReceive
60+
override val onReceiveCatching = channel.onReceiveCatching
61+
62+
/**
63+
* Receives data along with its close action from the channel.
64+
*
65+
* @return The received data along with its close action.
66+
*/
67+
override suspend fun receive() = channel.receive()
68+
69+
override suspend fun receiveCatching() = channel.receiveCatching()
70+
71+
override fun tryReceive() = channel.tryReceive()
72+
73+
override fun iterator() = channel.iterator()
74+
75+
override fun cancel(cause: CancellationException?) =
76+
channel.cancel(cause)
77+
78+
@Deprecated(
79+
"Since 1.2.0, binary compatibility with versions <= 1.1.x",
80+
level = DeprecationLevel.HIDDEN
81+
)
82+
override fun cancel(cause: Throwable?): Boolean {
83+
TODO("Not yet implemented")
84+
}
85+
86+
class CloseableData<T>(
87+
val data: T,
88+
val onClose: () -> Unit
89+
) : Closeable {
90+
override fun close() {
91+
onClose()
92+
}
93+
}
94+
}
95+
96+
/**
97+
* Receives data from the channel and uses it in a [block], ensuring that the data is properly closed after use.
98+
*
99+
* @param block The block of code to execute with the received data.
100+
* @return The result of the block execution.
101+
*/
102+
suspend inline fun <T, R> ChannelWithCloseableData<T>.useReceive(block: (T) -> R): R {
103+
return receive().use {
104+
block(it.data)
105+
}
106+
}
107+
108+
suspend inline fun <T> ChannelWithCloseableData<T>.useConsumeEach(action: (T) -> Unit): Unit =
109+
consumeEach { closeableData ->
110+
closeableData.use {
111+
action(it.data)
112+
}
113+
}

extensions/flv/src/main/java/io/github/thibaultbee/streampack/ext/flv/elements/endpoints/FlvEndpoints.kt

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,42 @@ import io.github.thibaultbee.krtmp.flv.tags.FLVTag
2323
import io.github.thibaultbee.krtmp.flv.tags.script.OnMetadata
2424
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
2525
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
26-
import io.github.thibaultbee.streampack.core.elements.data.useAndUnwrap
2726
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2827
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpointInternal
2928
import io.github.thibaultbee.streampack.core.elements.endpoints.MediaSinkType
3029
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint.EndpointInfo
3130
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.ContentSink
31+
import io.github.thibaultbee.streampack.core.elements.utils.ChannelWithCloseableData
32+
import io.github.thibaultbee.streampack.core.elements.utils.useConsumeEach
3233
import io.github.thibaultbee.streampack.core.logger.Logger
3334
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
3435
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.FlvMuxerInfo
35-
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.utils.FlvDataBuilder
36+
import io.github.thibaultbee.streampack.ext.flv.elements.endpoints.composites.muxer.utils.FlvTagBuilder
3637
import kotlinx.coroutines.CoroutineDispatcher
38+
import kotlinx.coroutines.CoroutineScope
39+
import kotlinx.coroutines.channels.BufferOverflow
3740
import kotlinx.coroutines.flow.MutableStateFlow
3841
import kotlinx.coroutines.flow.StateFlow
3942
import kotlinx.coroutines.flow.asStateFlow
43+
import kotlinx.coroutines.launch
4044
import kotlinx.coroutines.sync.Mutex
4145
import kotlinx.coroutines.sync.withLock
46+
import kotlinx.coroutines.withContext
4247

4348
/**
4449
* Writes FLV Data to a file or content.
4550
*/
46-
sealed class FlvEndpoint(private val coroutineDispatcher: CoroutineDispatcher) : IEndpointInternal {
47-
private val flvDataBuilder = FlvDataBuilder()
48-
private var flvMuxer: FLVMuxer? = null
51+
sealed class FlvEndpoint(
52+
defaultDispatcher: CoroutineDispatcher,
53+
private val ioDispatcher: CoroutineDispatcher
54+
) : IEndpointInternal {
55+
private val coroutineScope = CoroutineScope(defaultDispatcher)
4956
private val mutex = Mutex()
5057

58+
private val flvTagChannel = ChannelWithCloseableData<FLVTag>(20, BufferOverflow.DROP_OLDEST)
59+
private val flvTagBuilder = FlvTagBuilder(flvTagChannel)
60+
private var flvMuxer: FLVMuxer? = null
61+
5162
private var startUpTimestamp = INVALID_TIMESTAMP
5263
private val timestampMutex = Mutex()
5364

@@ -63,9 +74,21 @@ sealed class FlvEndpoint(private val coroutineDispatcher: CoroutineDispatcher) :
6374

6475
override fun getInfo(type: MediaDescriptor.Type) = info
6576

77+
init {
78+
coroutineScope.launch {
79+
flvTagChannel.useConsumeEach { flvTag ->
80+
try {
81+
write(flvTag)
82+
} catch (t: Throwable) {
83+
Logger.e(TAG, "Error while writing FLV data: $t")
84+
}
85+
}
86+
}
87+
}
88+
6689
private suspend fun safeMuxer(block: suspend (FLVMuxer) -> Unit) {
6790
val flvMuxer = requireNotNull(flvMuxer) { "Not opened" }
68-
mutex.withLock { block(flvMuxer) }
91+
mutex.withLock { withContext(ioDispatcher) { block(flvMuxer) } }
6992
}
7093

7194
abstract suspend fun openImpl(descriptor: MediaDescriptor): FLVMuxer
@@ -89,25 +112,28 @@ sealed class FlvEndpoint(private val coroutineDispatcher: CoroutineDispatcher) :
89112
startUpTimestamp
90113
}
91114

115+
private suspend fun write(flvTag: FLVTag) {
116+
safeMuxer { flvMuxer ->
117+
flvMuxer.encode(flvTag)
118+
}
119+
// Close FLVTag data if needed
120+
(flvTag.data as AutoCloseable?)?.close()
121+
}
122+
92123
override suspend fun write(
93124
closeableFrame: FrameWithCloseable, streamPid: Int
94125
) {
95-
closeableFrame.useAndUnwrap { frame ->
96-
val startUpTimestamp = getStartUpTimestamp(frame.ptsInUs)
97-
val ts = (frame.ptsInUs - startUpTimestamp) / 1000
98-
flvDataBuilder.write(frame, streamPid).forEach {
99-
safeMuxer { flvMuxer ->
100-
flvMuxer.encode(FLVTag(ts.toInt(), it))
101-
}
102-
}
103-
}
126+
val frame = closeableFrame.frame
127+
val startUpTimestamp = getStartUpTimestamp(frame.ptsInUs)
128+
val ts = (frame.ptsInUs - startUpTimestamp) / 1000
129+
flvTagBuilder.write(closeableFrame, ts.toInt(), streamPid)
104130
}
105131

106132
override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {
107133
require(streamConfigs.isNotEmpty()) { "At least one stream must be provided" }
108134
mutex.tryLock()
109135
return try {
110-
flvDataBuilder.addStreams(streamConfigs)
136+
flvTagBuilder.addStreams(streamConfigs)
111137
} finally {
112138
mutex.unlock()
113139
}
@@ -116,27 +142,29 @@ sealed class FlvEndpoint(private val coroutineDispatcher: CoroutineDispatcher) :
116142
override fun addStream(streamConfig: CodecConfig): Int {
117143
mutex.tryLock()
118144
return try {
119-
flvDataBuilder.addStream(streamConfig)
145+
flvTagBuilder.addStream(streamConfig)
120146
} finally {
121147
mutex.unlock()
122148
}
123149
}
124150

125151
override suspend fun startStream() {
126152
safeMuxer { flvMuxer ->
127-
flvMuxer.encodeFLVHeader(flvDataBuilder.hasAudio, flvDataBuilder.hasVideo)
128-
flvMuxer.encode(0, OnMetadata(flvDataBuilder.metadata))
153+
flvMuxer.encodeFLVHeader(flvTagBuilder.hasAudio, flvTagBuilder.hasVideo)
154+
flvMuxer.encode(0, OnMetadata(flvTagBuilder.metadata))
129155
}
130156
}
131157

132158
override suspend fun stopStream() {
133159
mutex.withLock {
134160
try {
135-
flvMuxer?.flush()
161+
withContext(ioDispatcher) {
162+
flvMuxer?.flush()
163+
}
136164
} catch (t: Throwable) {
137165
Logger.w(TAG, "Error while flushing FLV muxer: $t")
138166
} finally {
139-
flvDataBuilder.clearStreams()
167+
flvTagBuilder.clearStreams()
140168
}
141169
}
142170
timestampMutex.withLock {
@@ -155,6 +183,10 @@ sealed class FlvEndpoint(private val coroutineDispatcher: CoroutineDispatcher) :
155183
}
156184
}
157185

186+
override fun release() {
187+
flvTagChannel.cancel()
188+
}
189+
158190
companion object {
159191
private const val TAG = "FlvEndpoint"
160192

@@ -166,8 +198,12 @@ sealed class FlvEndpoint(private val coroutineDispatcher: CoroutineDispatcher) :
166198
/**
167199
* Writes FLV Data to a content.
168200
*/
169-
class FlvContentEndpoint(private val context: Context, coroutineDispatcher: CoroutineDispatcher) :
170-
FlvEndpoint(coroutineDispatcher) {
201+
class FlvContentEndpoint(
202+
private val context: Context,
203+
defaultDispatcher: CoroutineDispatcher,
204+
ioDispatcher: CoroutineDispatcher
205+
) :
206+
FlvEndpoint(defaultDispatcher, ioDispatcher) {
171207
override suspend fun openImpl(descriptor: MediaDescriptor): FLVMuxer {
172208
require(descriptor.type.sinkType == MediaSinkType.CONTENT) { "Descriptor type must be ${MediaSinkType.CONTENT}" }
173209
return FLVMuxer(ContentSink.openContent(context, descriptor.uri), AmfVersion.AMF0)
@@ -181,14 +217,18 @@ class FlvContentEndpointFactory : IEndpointInternal.Factory {
181217
override fun create(
182218
context: Context,
183219
dispatcherProvider: IDispatcherProvider
184-
): IEndpointInternal = FlvContentEndpoint(context, dispatcherProvider.io)
220+
): IEndpointInternal =
221+
FlvContentEndpoint(context, dispatcherProvider.default, dispatcherProvider.io)
185222
}
186223

187224

188225
/**
189226
* Writes FLV Data to a file.
190227
*/
191-
class FlvFileEndpoint(coroutineDispatcher: CoroutineDispatcher) : FlvEndpoint(coroutineDispatcher) {
228+
class FlvFileEndpoint(
229+
defaultDispatcher: CoroutineDispatcher,
230+
ioDispatcher: CoroutineDispatcher
231+
) : FlvEndpoint(defaultDispatcher, ioDispatcher) {
192232
override suspend fun openImpl(descriptor: MediaDescriptor): FLVMuxer {
193233
require(descriptor.type.sinkType == MediaSinkType.FILE) { "Descriptor type must be ${MediaSinkType.FILE}" }
194234
return FLVMuxer(descriptor.uri.path!!, AmfVersion.AMF0)
@@ -202,7 +242,7 @@ class FlvFileEndpointFactory : IEndpointInternal.Factory {
202242
override fun create(
203243
context: Context,
204244
dispatcherProvider: IDispatcherProvider
205-
): IEndpointInternal = FlvFileEndpoint(dispatcherProvider.io)
245+
): IEndpointInternal = FlvFileEndpoint(dispatcherProvider.default, dispatcherProvider.io)
206246
}
207247

208248

0 commit comments

Comments
 (0)