Skip to content

Commit de4f2ef

Browse files
committed
refactor(core): endpoint: pass the frame closeable in the frame parameter passed for IEndpointInternal.write
1 parent 4813ef8 commit de4f2ef

File tree

18 files changed

+102
-67
lines changed

18 files changed

+102
-67
lines changed

core/src/androidTest/java/io/github/thibaultbee/streampack/core/elements/endpoints/DummyEndpoint.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import android.content.Context
1919
import android.util.Log
2020
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
2121
import io.github.thibaultbee.streampack.core.elements.data.Frame
22+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2223
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2324
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
2425
import kotlinx.coroutines.flow.MutableStateFlow
@@ -63,14 +64,15 @@ class DummyEndpoint : IEndpointInternal {
6364
_isOpenFlow.emit(false)
6465
}
6566

66-
override suspend fun write(frame: Frame, streamPid: Int, onFrameProcessed: (() -> Unit)) {
67+
override suspend fun write(closeableFrame: FrameWithCloseable, streamPid: Int) {
68+
val frame = closeableFrame.frame
6769
Log.i(TAG, "write: $frame")
6870
_frameFlow.emit(frame)
6971
when {
7072
frame.isAudio -> numOfAudioFramesWritten++
7173
frame.isVideo -> numOfVideoFramesWritten++
7274
}
73-
onFrameProcessed()
75+
closeableFrame.close()
7476
}
7577

7678
override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {

core/src/main/java/io/github/thibaultbee/streampack/core/elements/data/Frame.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ fun CloseableFrame(
148148
/**
149149
* A callback to call when frame is closed.
150150
*/
151-
onClosed: (CloseableFrame) -> Unit,
152-
) = CloseableFrame(
151+
onClosed: (FrameWithCloseable) -> Unit,
152+
) = FrameWithCloseable(
153153
Frame(
154154
rawBuffer,
155155
ptsInUs,
@@ -163,9 +163,9 @@ fun CloseableFrame(
163163
/**
164164
* Frame internal representation.
165165
*/
166-
data class CloseableFrame(
166+
data class FrameWithCloseable(
167167
val frame: Frame,
168-
val onClosed: (CloseableFrame) -> Unit
168+
val onClosed: (FrameWithCloseable) -> Unit
169169
) : Closeable {
170170
override fun close() {
171171
try {

core/src/main/java/io/github/thibaultbee/streampack/core/elements/encoders/IEncoder.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package io.github.thibaultbee.streampack.core.elements.encoders
1717

1818
import android.view.Surface
19-
import io.github.thibaultbee.streampack.core.elements.data.CloseableFrame
19+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2020
import io.github.thibaultbee.streampack.core.elements.data.RawFrame
2121
import io.github.thibaultbee.streampack.core.elements.interfaces.Releasable
2222
import io.github.thibaultbee.streampack.core.elements.interfaces.SuspendStreamable
@@ -77,7 +77,7 @@ interface IEncoderInternal : SuspendStreamable, Releasable,
7777
/**
7878
* A channel where the encoder will send encoded frames.
7979
*/
80-
val outputChannel: SendChannel<CloseableFrame>
80+
val outputChannel: SendChannel<FrameWithCloseable>
8181
}
8282

8383
/**

core/src/main/java/io/github/thibaultbee/streampack/core/elements/encoders/mediacodec/MediaCodecEncoder.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import android.os.Bundle
2323
import android.util.Log
2424
import android.view.Surface
2525
import io.github.thibaultbee.streampack.core.elements.data.CloseableFrame
26+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2627
import io.github.thibaultbee.streampack.core.elements.data.RawFrame
2728
import io.github.thibaultbee.streampack.core.elements.encoders.EncoderMode
2829
import io.github.thibaultbee.streampack.core.elements.encoders.IEncoderInternal
@@ -593,7 +594,7 @@ internal constructor(
593594
*/
594595
fun frame(
595596
index: Int, outputFormat: MediaFormat, info: BufferInfo, tag: String
596-
): CloseableFrame {
597+
): FrameWithCloseable {
597598
var pts = info.presentationTimeUs
598599
if (pts <= previousPresentationTimestamp) {
599600
pts = previousPresentationTimestamp + 1
@@ -618,7 +619,7 @@ internal constructor(
618619
*/
619620
private fun Frame(
620621
codec: MediaCodec, index: Int, outputFormat: MediaFormat, info: BufferInfo, tag: String
621-
): CloseableFrame {
622+
): FrameWithCloseable {
622623
val buffer = requireNotNull(codec.getOutputBuffer(index))
623624
return CloseableFrame(
624625
buffer, info.presentationTimeUs, // pts

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/CombineEndpoint.kt

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.elements.endpoints
1717

1818
import android.content.Context
1919
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
20-
import io.github.thibaultbee.streampack.core.elements.data.Frame
20+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2121
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2222
import io.github.thibaultbee.streampack.core.elements.utils.extensions.intersect
2323
import io.github.thibaultbee.streampack.core.logger.Logger
@@ -209,7 +209,8 @@ open class CombineEndpoint(
209209
*
210210
* If all endpoints write fails, it throws the exception of the first endpoint that failed.
211211
*/
212-
override suspend fun write(frame: Frame, streamPid: Int, onFrameProcessed: (() -> Unit)) {
212+
override suspend fun write(closeableFrame: FrameWithCloseable, streamPid: Int) {
213+
val frame = closeableFrame.frame
213214
val throwables = mutableListOf<Throwable>()
214215

215216
/**
@@ -221,11 +222,13 @@ open class CombineEndpoint(
221222
endpointInternals.filter { it.isOpenFlow.value }.forEach { endpoint ->
222223
try {
223224
val deferred = CompletableDeferred<Unit>()
224-
val duplicatedFrame = frame.copy(rawBuffer = frame.rawBuffer.duplicate())
225-
val endpointStreamId = endpointsToStreamIdsMap[Pair(endpoint, streamPid)]!!
225+
val duplicatedFrame = FrameWithCloseable(
226+
frame.copy(rawBuffer = frame.rawBuffer.duplicate()),
227+
{ deferred.complete(Unit) })
226228

229+
val endpointStreamId = endpointsToStreamIdsMap[Pair(endpoint, streamPid)]!!
227230
deferreds += deferred
228-
endpoint.write(duplicatedFrame, endpointStreamId, { deferred.complete(Unit) })
231+
endpoint.write(duplicatedFrame, endpointStreamId)
229232
} catch (t: Throwable) {
230233
Logger.e(TAG, "Failed to get stream id for endpoint $endpoint", t)
231234
throwables += t
@@ -234,7 +237,7 @@ open class CombineEndpoint(
234237

235238
coroutineScope.launch {
236239
deferreds.forEach { it.await() }
237-
onFrameProcessed()
240+
closeableFrame.close()
238241
}
239242

240243
if (throwables.isNotEmpty()) {

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/DynamicEndpoint.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package io.github.thibaultbee.streampack.core.elements.endpoints
1818
import android.content.Context
1919
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
2020
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.createDefaultTsServiceInfo
21-
import io.github.thibaultbee.streampack.core.elements.data.Frame
21+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2222
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2323
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoint
2424
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.CompositeEndpoints
@@ -131,8 +131,8 @@ open class DynamicEndpoint(
131131
}
132132
}
133133

134-
override suspend fun write(frame: Frame, streamPid: Int, onFrameProcessed: () -> Unit) =
135-
safeEndpoint { endpoint -> endpoint.write(frame, streamPid, onFrameProcessed) }
134+
override suspend fun write(closeableFrame: FrameWithCloseable, streamPid: Int) =
135+
safeEndpoint { endpoint -> endpoint.write(closeableFrame, streamPid) }
136136

137137
override suspend fun startStream() = safeEndpoint { endpoint -> endpoint.startStream() }
138138

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/IEndpoint.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package io.github.thibaultbee.streampack.core.elements.endpoints
1717

1818
import android.content.Context
1919
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
20+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2021
import io.github.thibaultbee.streampack.core.elements.data.Frame
2122
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2223
import io.github.thibaultbee.streampack.core.elements.endpoints.composites.sinks.FileSink
@@ -39,18 +40,15 @@ interface IEndpointInternal : IEndpoint, SuspendStreamable,
3940
/**
4041
* Writes a [Frame] to the [IEndpointInternal].
4142
*
42-
* The [onFrameProcessed] callback must be called when the frame has been processed and the [Frame.rawBuffer] is not used anymore.
43-
* The [IEndpointInternal] must called [onFrameProcessed] even if the frame is dropped or it somehow crashes.
44-
* Also, once [onFrameProcessed] is called, the [Frame.rawBuffer] must not be used anymore by the [IEndpointInternal].
43+
* The [FrameWithCloseable.close] must be called when the frame has been processed and the [Frame.rawBuffer] is not used anymore.
44+
* The [IEndpointInternal] must called [FrameWithCloseable.close] even if the frame is dropped or it somehow crashes.
4545
*
46-
* @param frame the [Frame] to write
46+
* @param closeableFrame the [Frame] to write
4747
* @param streamPid the stream id the [Frame] belongs to
48-
* @param onFrameProcessed a callback called when the [Frame.rawBuffer] is not used anymore
4948
*/
5049
suspend fun write(
51-
frame: Frame,
50+
closeableFrame: FrameWithCloseable,
5251
streamPid: Int,
53-
onFrameProcessed: (() -> Unit)
5452
)
5553

5654
/**

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/MediaMuxerEndpoint.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import android.media.MediaMuxer.OutputFormat
2424
import android.os.Build
2525
import android.os.ParcelFileDescriptor
2626
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
27-
import io.github.thibaultbee.streampack.core.elements.data.Frame
27+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2828
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2929
import io.github.thibaultbee.streampack.core.logger.Logger
3030
import io.github.thibaultbee.streampack.core.pipelines.IDispatcherProvider
@@ -140,8 +140,9 @@ class MediaMuxerEndpoint(
140140
}
141141

142142
override suspend fun write(
143-
frame: Frame, streamPid: Int, onFrameProcessed: () -> Unit
143+
closeableFrame: FrameWithCloseable, streamPid: Int
144144
) = withContext(ioDispatcher) {
145+
val frame = closeableFrame.frame
145146
mutex.withLock {
146147
try {
147148
if (state != State.STARTED && state != State.PENDING_START) {
@@ -180,7 +181,7 @@ class MediaMuxerEndpoint(
180181
Logger.e(TAG, "Error while writing frame: ${t.message}")
181182
throw t
182183
} finally {
183-
onFrameProcessed()
184+
closeableFrame.close()
184185
}
185186
}
186187
}

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/CompositeEndpoint.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ package io.github.thibaultbee.streampack.core.elements.endpoints.composites
1717

1818
import android.content.Context
1919
import io.github.thibaultbee.streampack.core.configuration.mediadescriptor.MediaDescriptor
20-
import io.github.thibaultbee.streampack.core.elements.data.Frame
20+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
2121
import io.github.thibaultbee.streampack.core.elements.data.Packet
2222
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2323
import io.github.thibaultbee.streampack.core.elements.endpoints.IEndpoint
@@ -75,10 +75,9 @@ class CompositeEndpoint(
7575
}
7676

7777
override suspend fun write(
78-
frame: Frame,
79-
streamPid: Int,
80-
onFrameProcessed: (() -> Unit)
81-
) = muxer.write(frame, streamPid, onFrameProcessed)
78+
closeableFrame: FrameWithCloseable,
79+
streamPid: Int
80+
) = muxer.write(closeableFrame, streamPid)
8281

8382
override fun addStreams(streamConfigs: List<CodecConfig>): Map<CodecConfig, Int> {
8483
mutex.tryLock()

core/src/main/java/io/github/thibaultbee/streampack/core/elements/endpoints/composites/muxers/IMuxerInternal.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package io.github.thibaultbee.streampack.core.elements.endpoints.composites.muxers
1717

18-
import io.github.thibaultbee.streampack.core.elements.data.Frame
18+
import io.github.thibaultbee.streampack.core.elements.data.FrameWithCloseable
1919
import io.github.thibaultbee.streampack.core.elements.data.Packet
2020
import io.github.thibaultbee.streampack.core.elements.encoders.CodecConfig
2121
import io.github.thibaultbee.streampack.core.elements.interfaces.Releasable
@@ -32,7 +32,7 @@ interface IMuxerInternal :
3232
fun onOutputFrame(packet: Packet)
3333
}
3434

35-
fun write(frame: Frame, streamPid: Int, onFrameProcessed: () -> Unit)
35+
fun write(closeableFrame: FrameWithCloseable, streamPid: Int)
3636

3737
fun addStreams(streamsConfig: List<CodecConfig>): Map<CodecConfig, Int>
3838

0 commit comments

Comments
 (0)