Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions smelter-core/src/pipeline/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::prelude::*;

pub(super) mod decoder_thread_audio;
pub(super) mod decoder_thread_video;
pub(super) mod h264_utils;

mod dynamic_stream;
mod static_stream;
Expand Down Expand Up @@ -43,9 +42,11 @@ pub(crate) struct DecodedSamples {
pub sample_rate: u32,
}

#[derive(Debug)]
pub(crate) enum EncodedInputEvent {
Chunk(EncodedInputChunk),
LostData,
AuDelimiter,
}

#[derive(Debug)]
Expand All @@ -63,9 +64,8 @@ pub(crate) trait VideoDecoder: Sized + VideoDecoderInstance {
}

pub(crate) trait VideoDecoderInstance {
fn decode(&mut self, chunk: EncodedInputChunk) -> Vec<Frame>;
fn decode(&mut self, chunk: EncodedInputEvent) -> Vec<Frame>;
fn flush(&mut self) -> Vec<Frame>;
fn skip_until_keyframe(&mut self);
}

pub(crate) trait BytestreamTransformer: Send + 'static {
Expand Down
14 changes: 5 additions & 9 deletions smelter-core/src/pipeline/decoder/dynamic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,15 @@ where

fn next(&mut self) -> Option<Self::Item> {
match self.source.next() {
Some(PipelineEvent::Data(EncodedInputEvent::Chunk(samples))) => {
Some(PipelineEvent::Data(event)) => {
// TODO: flush on decoder change
self.ensure_decoder(samples.kind);
if let EncodedInputEvent::Chunk(chunk) = &event {
self.ensure_decoder(chunk.kind);
}
let decoder = self.decoder.as_mut()?;
let chunks = decoder.decode(samples);
let chunks = decoder.decode(event);
Some(chunks.into_iter().map(PipelineEvent::Data).collect())
}
Some(PipelineEvent::Data(EncodedInputEvent::LostData)) => {
if let Some(decoder) = self.decoder.as_mut() {
decoder.skip_until_keyframe()
}
Some(vec![])
}
Some(PipelineEvent::EOS) | None => match self.eos_sent {
true => None,
false => {
Expand Down
72 changes: 51 additions & 21 deletions smelter-core/src/pipeline/decoder/ffmpeg_h264.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::{iter, sync::Arc};

use crate::pipeline::decoder::{
KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
ffmpeg_utils::{create_av_packet, from_av_frame},
use crate::pipeline::{
decoder::{
EncodedInputEvent, KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
ffmpeg_utils::{create_av_packet, from_av_frame},
},
utils::H264AuSplitter,
};
use crate::prelude::*;

Expand All @@ -12,14 +15,15 @@ use ffmpeg_next::{
media::Type,
};
use smelter_render::Frame;
use tracing::{error, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

const TIME_BASE: i32 = 1_000_000;

pub struct FfmpegH264Decoder {
decoder: ffmpeg_next::decoder::Opened,
keyframe_request_sender: Option<KeyframeRequestSender>,
av_frame: ffmpeg_next::frame::Video,
au_splitter: H264AuSplitter,
}

impl VideoDecoder for FfmpegH264Decoder {
Expand Down Expand Up @@ -49,41 +53,67 @@ impl VideoDecoder for FfmpegH264Decoder {
decoder,
keyframe_request_sender,
av_frame: ffmpeg_next::frame::Video::empty(),
au_splitter: H264AuSplitter::default(),
})
}
}

impl VideoDecoderInstance for FfmpegH264Decoder {
fn decode(&mut self, chunk: EncodedInputChunk) -> Vec<Frame> {
trace!(?chunk, "H264 decoder received a chunk.");
let av_packet = match create_av_packet(chunk, VideoCodec::H264, TIME_BASE) {
Ok(packet) => packet,
Err(err) => {
warn!("Dropping frame: {}", err);
return Vec::new();
fn decode(&mut self, event: EncodedInputEvent) -> Vec<Frame> {
trace!(?event, "FFmpeg H264 decoder received an event.");
let au_chunks = match event {
EncodedInputEvent::Chunk(chunk) => match self.au_splitter.put_chunk(chunk) {
Ok(chunks) => chunks,
Err(err) => {
if let Some(s) = self.keyframe_request_sender.as_ref() {
s.send()
}
debug!("H264 AU splitter could not process the chunks: {err}");
return Vec::new();
}
},
EncodedInputEvent::LostData => {
self.au_splitter.mark_missing_data();
return vec![];
}
EncodedInputEvent::AuDelimiter => match self.au_splitter.flush() {
Ok(chunks) => chunks,
Err(err) => {
if let Some(s) = self.keyframe_request_sender.as_ref() {
s.send()
}
debug!("H264 AU splitter could not process the chunks: {err}");
return Vec::new();
}
},
};

match self.decoder.send_packet(&av_packet) {
Ok(()) => {}
Err(e) => {
// TODO: move to parser
if let Some(s) = self.keyframe_request_sender.as_ref() {
s.send()
for chunk in au_chunks {
trace!(?chunk, "FFmpeg H264 processing AU chunk");
let av_packet = match create_av_packet(chunk, VideoCodec::H264, TIME_BASE) {
Ok(packet) => packet,
Err(err) => {
warn!("Dropping frame: {}", err);
continue;
}
};

match self.decoder.send_packet(&av_packet) {
Ok(()) => {}
Err(e) => {
warn!("Failed to send a packet to decoder: {:?}", e);
continue;
}
warn!("Failed to send a packet to decoder: {:?}", e);
return Vec::new();
}
}

self.read_all_frames()
}

fn flush(&mut self) -> Vec<Frame> {
self.decoder.flush();
self.read_all_frames()
}

fn skip_until_keyframe(&mut self) {}
}

impl FfmpegH264Decoder {
Expand Down
13 changes: 8 additions & 5 deletions smelter-core/src/pipeline/decoder/ffmpeg_vp8.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{iter, sync::Arc};

use crate::pipeline::decoder::{
KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
EncodedInputEvent, KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
ffmpeg_utils::{create_av_packet, from_av_frame},
};
use crate::prelude::*;
Expand Down Expand Up @@ -52,8 +52,13 @@ impl VideoDecoder for FfmpegVp8Decoder {
}

impl VideoDecoderInstance for FfmpegVp8Decoder {
fn decode(&mut self, chunk: EncodedInputChunk) -> Vec<Frame> {
trace!(?chunk, "VP8 decoder received a chunk.");
fn decode(&mut self, event: EncodedInputEvent) -> Vec<Frame> {
trace!(?event, "FFmpeg VP8 decoder received an event.");

let EncodedInputEvent::Chunk(chunk) = event else {
return vec![];
};

let av_packet = match create_av_packet(chunk, VideoCodec::Vp8, TIME_BASE) {
Ok(packet) => packet,
Err(err) => {
Expand All @@ -76,8 +81,6 @@ impl VideoDecoderInstance for FfmpegVp8Decoder {
self.decoder.flush();
self.read_all_frames()
}

fn skip_until_keyframe(&mut self) {}
}

impl FfmpegVp8Decoder {
Expand Down
12 changes: 7 additions & 5 deletions smelter-core/src/pipeline/decoder/ffmpeg_vp9.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{iter, sync::Arc};

use crate::pipeline::decoder::{
KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
EncodedInputEvent, KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
ffmpeg_utils::{create_av_packet, from_av_frame},
};
use crate::prelude::*;
Expand Down Expand Up @@ -52,8 +52,12 @@ impl VideoDecoder for FfmpegVp9Decoder {
}

impl VideoDecoderInstance for FfmpegVp9Decoder {
fn decode(&mut self, chunk: EncodedInputChunk) -> Vec<Frame> {
trace!(?chunk, "VP9 decoder received a chunk.");
fn decode(&mut self, event: EncodedInputEvent) -> Vec<Frame> {
trace!(?event, "FFmpeg VP9 decoder received a chunk.");
let EncodedInputEvent::Chunk(chunk) = event else {
return vec![];
};

let av_packet = match create_av_packet(chunk, VideoCodec::Vp9, TIME_BASE) {
Ok(packet) => packet,
Err(err) => {
Expand All @@ -76,8 +80,6 @@ impl VideoDecoderInstance for FfmpegVp9Decoder {
self.decoder.flush();
self.read_all_frames()
}

fn skip_until_keyframe(&mut self) {}
}

impl FfmpegVp9Decoder {
Expand Down
10 changes: 3 additions & 7 deletions smelter-core/src/pipeline/decoder/static_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,10 @@ where

fn next(&mut self) -> Option<Self::Item> {
match self.source.next() {
Some(PipelineEvent::Data(EncodedInputEvent::Chunk(chunk))) => {
let chunks = self.decoder.decode(chunk);
Some(PipelineEvent::Data(event)) => {
let chunks = self.decoder.decode(event);
Some(chunks.into_iter().map(PipelineEvent::Data).collect())
}
Some(PipelineEvent::Data(EncodedInputEvent::LostData)) => {
self.decoder.skip_until_keyframe();
Some(vec![])
}
Some(PipelineEvent::EOS) | None => match self.eos_sent {
true => None,
false => {
Expand Down Expand Up @@ -115,7 +111,7 @@ where
};
Some(chunks.into_iter().map(PipelineEvent::Data).collect())
}
Some(PipelineEvent::Data(EncodedInputEvent::LostData)) => Some(vec![]),
Some(PipelineEvent::Data(_)) => Some(vec![]),
Some(PipelineEvent::EOS) | None => match self.eos_sent {
true => None,
false => {
Expand Down
29 changes: 19 additions & 10 deletions smelter-core/src/pipeline/decoder/vulkan_h264.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{sync::Arc, time::Duration};

use smelter_render::{Frame, FrameData, Resolution};
use tracing::{debug, info, warn};
use tracing::{debug, info, trace, warn};
use vk_video::{
DecoderError, ReferenceManagementError, WgpuTexturesDecoder,
parameters::{DecoderParameters, DecoderUsageFlags, MissedFrameHandling},
};

use crate::pipeline::decoder::{KeyframeRequestSender, VideoDecoder, VideoDecoderInstance};
use crate::pipeline::decoder::{
EncodedInputEvent, KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
};
use crate::prelude::*;

pub struct VulkanH264Decoder {
Expand Down Expand Up @@ -41,10 +43,21 @@ impl VideoDecoder for VulkanH264Decoder {
}

impl VideoDecoderInstance for VulkanH264Decoder {
fn decode(&mut self, chunk: EncodedInputChunk) -> Vec<Frame> {
let chunk = vk_video::EncodedInputChunk {
data: chunk.data.as_ref(),
pts: Some(chunk.pts.as_micros() as u64),
fn decode(&mut self, event: EncodedInputEvent) -> Vec<Frame> {
trace!(?event, "Vulkan H264 decoder received an event.");

let chunk = match &event {
EncodedInputEvent::Chunk(chunk) => vk_video::EncodedInputChunk {
data: chunk.data.as_ref(),
pts: Some(chunk.pts.as_micros() as u64),
},
EncodedInputEvent::LostData => {
self.decoder.mark_missing_data();
return vec![];
}
EncodedInputEvent::AuDelimiter => {
return vec![];
}
};

let frames = match self.decoder.decode(chunk) {
Expand Down Expand Up @@ -72,10 +85,6 @@ impl VideoDecoderInstance for VulkanH264Decoder {
.map(from_vk_frame)
.collect()
}

fn skip_until_keyframe(&mut self) {
self.decoder.mark_missing_data();
}
}

fn from_vk_frame(frame: vk_video::Frame<wgpu::Texture>) -> Frame {
Expand Down
8 changes: 4 additions & 4 deletions smelter-core/src/pipeline/decoder/vulkan_h264_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::Arc;
use smelter_render::Frame;
use tracing::error;

use crate::pipeline::decoder::{KeyframeRequestSender, VideoDecoder, VideoDecoderInstance};
use crate::pipeline::decoder::{
EncodedInputEvent, KeyframeRequestSender, VideoDecoder, VideoDecoderInstance,
};
use crate::prelude::*;

pub struct VulkanH264Decoder;
Expand All @@ -20,7 +22,7 @@ impl VideoDecoder for VulkanH264Decoder {
}

impl VideoDecoderInstance for VulkanH264Decoder {
fn decode(&mut self, _chunk: EncodedInputChunk) -> Vec<Frame> {
fn decode(&mut self, _chunk: EncodedInputEvent) -> Vec<Frame> {
error!("Vulkan decoder unavailable, this code should never be called");
vec![]
}
Expand All @@ -29,6 +31,4 @@ impl VideoDecoderInstance for VulkanH264Decoder {
error!("Vulkan decoder unavailable, this code should never be called");
vec![]
}

fn skip_until_keyframe(&mut self) {}
}
8 changes: 3 additions & 5 deletions smelter-core/src/pipeline/hls/hls_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ use crate::{
DecoderThreadHandle,
decoder_thread_audio::{AudioDecoderThread, AudioDecoderThreadOptions},
decoder_thread_video::{VideoDecoderThread, VideoDecoderThreadOptions},
fdk_aac, ffmpeg_h264,
h264_utils::{AvccToAnnexBRepacker, H264AvcDecoderConfig},
vulkan_h264,
fdk_aac, ffmpeg_h264, vulkan_h264,
},
input::Input,
utils::input_buffer::InputBuffer,
utils::{H264AvcDecoderConfig, H264AvccToAnnexB, input_buffer::InputBuffer},
},
queue::QueueDataReceiver,
thread_utils::InitializableThread,
Expand Down Expand Up @@ -152,7 +150,7 @@ impl HlsInput {

let decoder_thread_options = VideoDecoderThreadOptions {
ctx: ctx.clone(),
transformer: h264_config.map(AvccToAnnexBRepacker::new),
transformer: h264_config.map(H264AvccToAnnexB::new),
frame_sender,
input_buffer_size: 2000,
};
Expand Down
Loading