Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions supervision/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from supervision.detection.tools.polygon_zone import PolygonZone, PolygonZoneAnnotator
from supervision.detection.tools.smoother import DetectionsSmoother
from supervision.detection.utils.boxes import (
box_aspect_ratio,
clip_boxes,
denormalize_boxes,
move_boxes,
Expand Down Expand Up @@ -196,6 +197,7 @@
"VideoInfo",
"VideoSink",
"approximate_polygon",
"box_aspect_ratio",
"box_iou",
"box_iou_batch",
"box_iou_batch_with_jaccard",
Expand Down
51 changes: 51 additions & 0 deletions supervision/detection/utils/boxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,57 @@
from supervision.detection.utils.iou_and_nms import box_iou_batch


def box_aspect_ratio(xyxy: np.ndarray) -> np.ndarray:
"""
Calculate aspect ratios of bounding boxes given in xyxy format.

Computes the width divided by height for each bounding box. Returns NaN
for boxes with zero height to avoid division errors.

Args:
xyxy (`numpy.ndarray`): Array of bounding boxes in
`(x_min, y_min, x_max, y_max)` format with shape `(N, 4)`.

Returns:
`numpy.ndarray`: Array of aspect ratios with shape `(N,)`, where each element is
the width divided by height of a box. Elements are NaN if height is zero.

Examples:
```python
import numpy as np
import supervision as sv

xyxy = np.array([
[10, 20, 30, 50],
[0, 0, 40, 10],
])

sv.box_aspect_ratio(xyxy)
# array([0.66666667, 4. ])

xyxy = np.array([
[10, 10, 30, 10],
[5, 5, 25, 25],
])

sv.box_aspect_ratio(xyxy)
# array([ nan, 1. ])
```
"""
widths = xyxy[:, 2] - xyxy[:, 0]
heights = xyxy[:, 3] - xyxy[:, 1]

aspect_ratios = np.full_like(widths, np.nan, dtype=np.float64)
np.divide(
widths,
heights,
out=aspect_ratios,
where=heights != 0,
)

return aspect_ratios


def clip_boxes(xyxy: np.ndarray, resolution_wh: tuple[int, int]) -> np.ndarray:
"""
Clips bounding boxes coordinates to fit within the frame resolution.
Expand Down
143 changes: 104 additions & 39 deletions supervision/utils/video.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import annotations

import threading
import time
from collections import deque
from collections.abc import Callable, Generator
from dataclasses import dataclass
from queue import Queue

import cv2
import numpy as np
Expand Down Expand Up @@ -196,63 +198,126 @@ def process_video(
source_path: str,
target_path: str,
callback: Callable[[np.ndarray, int], np.ndarray],
*,
max_frames: int | None = None,
prefetch: int = 32,
writer_buffer: int = 32,
show_progress: bool = False,
progress_message: str = "Processing video",
) -> None:
"""
Process a video file by applying a callback function on each frame
and saving the result to a target video file.
Process video frames asynchronously using a threaded pipeline.

This function orchestrates a three-stage pipeline to optimize video processing
throughput:

1. Reader thread: Continuously reads frames from the source video file and
enqueues them into a bounded queue (`frame_read_queue`). The queue size is
limited by the `prefetch` parameter to control memory usage.
2. Main thread (Processor): Dequeues frames from `frame_read_queue`, applies the
user-defined `callback` function to process each frame, then enqueues the
processed frames into another bounded queue (`frame_write_queue`) for writing.
The processing happens in the main thread, simplifying use of stateful objects
without synchronization.
3. Writer thread: Dequeues processed frames from `frame_write_queue` and writes
them sequentially to the output video file.

Args:
source_path (str): The path to the source video file.
target_path (str): The path to the target video file.
callback (Callable[[np.ndarray, int], np.ndarray]): A function that takes in
a numpy ndarray representation of a video frame and an
int index of the frame and returns a processed numpy ndarray
representation of the frame.
max_frames (Optional[int]): The maximum number of frames to process.
show_progress (bool): Whether to show a progress bar.
progress_message (str): The message to display in the progress bar.
source_path (str): Path to the input video file.
target_path (str): Path where the processed video will be saved.
callback (Callable[[numpy.ndarray, int], numpy.ndarray]): Function called for
each frame, accepting the frame as a numpy array and its zero-based index,
returning the processed frame.
max_frames (int | None): Optional maximum number of frames to process.
If None, the entire video is processed (default).
prefetch (int): Maximum number of frames buffered by the reader thread.
Controls memory use; default is 32.
writer_buffer (int): Maximum number of frames buffered before writing.
Controls output buffer size; default is 32.
show_progress (bool): Whether to display a tqdm progress bar during processing.
Default is False.
progress_message (str): Description shown in the progress bar.

Examples:
Returns:
None

Example:
```python
import cv2
import supervision as sv
from rfdetr import RFDETRMedium

def callback(scene: np.ndarray, index: int) -> np.ndarray:
...
model = RFDETRMedium()

def callback(frame, frame_index):
return model.predict(frame)

process_video(
source_path=<SOURCE_VIDEO_PATH>,
target_path=<TARGET_VIDEO_PATH>,
callback=callback
source_path="source.mp4",
target_path="target.mp4",
callback=frame_callback,
)
```
"""
source_video_info = VideoInfo.from_video_path(video_path=source_path)
video_frames_generator = get_video_frames_generator(
source_path=source_path, end=max_frames
video_info = VideoInfo.from_video_path(video_path=source_path)
total_frames = (
min(video_info.total_frames, max_frames)
if max_frames is not None
else video_info.total_frames
)
with VideoSink(target_path=target_path, video_info=source_video_info) as sink:
total_frames = (
min(source_video_info.total_frames, max_frames)
if max_frames is not None
else source_video_info.total_frames

frame_read_queue: Queue[tuple[int, np.ndarray] | None] = Queue(maxsize=prefetch)
frame_write_queue: Queue[np.ndarray | None] = Queue(maxsize=writer_buffer)

def reader_thread() -> None:
frame_generator = get_video_frames_generator(
source_path=source_path,
end=max_frames,
)
for index, frame in enumerate(
tqdm(
video_frames_generator,
total=total_frames,
disable=not show_progress,
desc=progress_message,
)
):
result_frame = callback(frame, index)
sink.write_frame(frame=result_frame)
else:
for index, frame in enumerate(video_frames_generator):
result_frame = callback(frame, index)
sink.write_frame(frame=result_frame)
for frame_index, frame in enumerate(frame_generator):
frame_read_queue.put((frame_index, frame))
frame_read_queue.put(None)

def writer_thread(video_sink: VideoSink) -> None:
while True:
frame = frame_write_queue.get()
if frame is None:
break
video_sink.write_frame(frame=frame)

reader_worker = threading.Thread(target=reader_thread, daemon=True)
with VideoSink(target_path=target_path, video_info=video_info) as video_sink:
writer_worker = threading.Thread(
target=writer_thread,
args=(video_sink,),
daemon=True,
)

reader_worker.start()
writer_worker.start()

progress_bar = tqdm(
total=total_frames,
disable=not show_progress,
desc=progress_message,
)

try:
while True:
read_item = frame_read_queue.get()
if read_item is None:
break

frame_index, frame = read_item
processed_frame = callback(frame, frame_index)

frame_write_queue.put(processed_frame)
progress_bar.update(1)
finally:
frame_write_queue.put(None)
reader_worker.join()
writer_worker.join()
progress_bar.close()


class FPSMonitor:
Expand Down