xpark.dataset.VideoCompute#

class xpark.dataset.VideoCompute(*args, **kwargs)[source]#

Note

Do not construct this class, use the staticmethod instead.

Methods

average_rate(videos)

The average framerate of the video, as a float

base_rate(videos)

The fundamental framerate of the stream, as a float

bit_rate(videos)

The average bitrate of the video stream, in bits per second

codec(videos)

The name of the codec used for the video stream, e.g., 'h264', 'vp9'

decode_frame_at(videos, timestamp[, tolerance_s])

Decode a video frame at specified timestamp.

decode_frames(videos[, timestamps, fps, ...])

Extract frames from video with multiple modes.

display_aspect_ratio(videos)

The display aspect ratio (DAR) of the video, e.g., 16:9 is 1.777...

extract_audio(videos[, codec, sample_rate, ...])

Extract audio from video.

height(videos)

Video height, in pixels

pix_fmt(videos)

The pixel format of the video, e.g., 'yuv420p'

split_by_duration(videos[, ...])

Split video by duration.

split_by_key_frame(videos)

Split video by keyframe

time_base(videos)

The time base of the stream, representing the unit of time for timestamps

width(videos)

Video width, in pixels

static average_rate(videos: pa.ChunkedArray) pa.Array#

The average framerate of the video, as a float

static base_rate(videos: pa.ChunkedArray) pa.Array#

The fundamental framerate of the stream, as a float

static bit_rate(videos: pa.ChunkedArray) pa.Array#

The average bitrate of the video stream, in bits per second

static codec(videos: pa.ChunkedArray) pa.Array#

The name of the codec used for the video stream, e.g., ‘h264’, ‘vp9’

static decode_frame_at(videos: pa.ChunkedArray, timestamp: pa.ChunkedArray, tolerance_s: float = 0.1) pa.Array#

Decode a video frame at specified timestamp.

This processor decodes a video frame from video file at the given timestamp and returns it as a tensor array. It uses PyAV (ffmpeg wrapper) for decoding and supports various file systems including local, COS, S3, HuggingFace Hub (hf://), and other fsspec-compatible storage systems.

Parameters:
  • videos – Column of video file paths (strings) or binary data. Supports local paths and remote paths (cos://, s3://, hf://, etc.).

  • timestamp – Column of timestamps in seconds (floats) at which to extract a frame from the corresponding video.

  • tolerance_s – Allowed deviation in seconds for frame retrieval. If the exact timestamp is not available, the closest frame within this tolerance will be returned. Default is 0.1 seconds.

Returns:

ArrowTensorArray of decoded frame with shape (C, H, W), dtype float32, values in range [0, 1].

Examples

Basic usage:

from xpark.dataset import from_items
from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute

ds = from_items([
    {"video": "video.mp4", "timestamp": 0.5},
    {"video": "video.mp4", "timestamp": 1.0},
])

# Decode video frame at specified timestamp
ds = ds.with_column(
    "frame",
    VideoCompute.decode_frame_at(col("video"), col("timestamp")),
)

# Get decoded frames
frames = ds.take(2)

With custom options:

ds = ds.with_column(
    "frame",
    VideoCompute.decode_frame_at
    .options(num_workers={"CPU": 2}, batch_size=16)
    .with_column(
        col("video"),
        col("timestamp"),
        tolerance_s=0.05,
    ),
)
static decode_frames(videos: pa.ChunkedArray, timestamps: pa.ChunkedArray | None = None, *, fps: float | None = None, keyframes_only: bool = False, num_frames: int | None = None, max_frames: int | None = None, start_time: float | None = None, end_time: float | None = None, tolerance_s: float = 0.1) pa.Array#

Extract frames from video with multiple modes.

This processor extracts video frames with flexible extraction modes and supports both path (string) and binary (bytes) input. It returns frames as tensor arrays.

Extraction modes (mutually exclusive, in priority order): 1. timestamps: Extract frames at specific timestamps (requires timestamps column) 2. fps: Extract frames at a fixed frame rate 3. keyframes_only: Extract only keyframes (I-frames) 4. num_frames: Extract N frames uniformly distributed

Parameters:
  • videos – Column of video file paths (strings) or binary data (bytes). Supports local paths and remote paths (cos://, s3://, hf://, etc.).

  • timestamps – Optional column of timestamps in seconds (floats) at which to extract frames. Can be a single float or a list of floats per video.

  • fps – Target frames per second to extract. For example, fps=2.0 extracts 2 frames per second of video.

  • keyframes_only – If True, extract only keyframes (I-frames). Keyframes are independently decodable frames, useful for scene detection.

  • num_frames – Number of frames to extract uniformly distributed across the video duration.

  • max_frames – Maximum number of frames to extract. Applies to fps and keyframes_only modes.

  • start_time – Start time in seconds for extraction range.

  • end_time – End time in seconds for extraction range.

  • tolerance_s – Allowed deviation in seconds for timestamp mode. Default is 0.1 seconds.

Returns:

ArrowTensorArray of extracted frames with shape (N, C, H, W) per video, dtype float32, values in range [0, 1].

Examples

Extract frames at specific timestamps:

from xpark.dataset import from_items
from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute

ds = from_items([{"video": "video.mp4", "ts": [0.5, 1.0, 1.5]}])
ds = ds.with_column(
    "frames",
    VideoCompute.decode_frames(col("video"), col("ts")),
)

Extract frames at 2 FPS:

ds = ds.with_column(
    "frames",
    VideoCompute.decode_frames(col("video"), fps=2.0, max_frames=10),
)

Extract keyframes only:

ds = ds.with_column(
    "keyframes",
    VideoCompute.decode_frames(col("video"), keyframes_only=True),
)

Extract 10 frames uniformly:

ds = ds.with_column(
    "frames",
    VideoCompute.decode_frames(col("video"), num_frames=10),
)

Extract from binary video data:

ds = xd.from_items([{"video_bytes": video_binary}])
ds = ds.with_column(
    "frames",
    VideoCompute.decode_frames(col("video_bytes"), num_frames=5),
)

See also

  • decode_frame_at(): Simpler API for single frame extraction.

  • data-juicer VideoExtractFramesMapper for similar functionality.

static display_aspect_ratio(videos: pa.ChunkedArray) pa.Array#

The display aspect ratio (DAR) of the video, e.g., 16:9 is 1.777…

static extract_audio(videos: pa.ChunkedArray, codec: str | None = None, sample_rate: int | None = None, stream_index: int | None = None, start_second: float = 0, end_second: float | None = None) pa.Array#

Extract audio from video.

This processor extracts audio data from video files and returns the audio binary data along with its file extension. It supports various file systems including COS, S3, HTTP, binary sources, and other fsspec-compatible storage systems.

Parameters:
  • videos – The videos to be processed.

  • codec – Output audio format.

  • sample_rate – Output audio sample rate.

  • stream_index – Index of the audio stream to extract.

  • start_second – Start time of the audio to extract.

  • end_second – End time of the audio to extract.

Examples

from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute
from xpark.dataset.context import DatasetContext

ctx = DatasetContext.get_current()
# set cos storage_options
# ctx.storage_options = {"cos": {"endpoint_url": "https://your-cos-endpoint"}}
# VIDEO_COS_PATH is cos path like cos://bucket/path/to/video.mp4

ds = from_items([{"video": VIDEO_COS_PATH}])
ds = ds.with_column(
    "audio",
    VideoCompute.extract_audio
    .options(num_workers={"CPU": 1}, batch_size=1)
    .with_column(col("video")),
)

audio_bytes = ds.take(1)[0]['audio']
static height(videos: pa.ChunkedArray) pa.Array#

Video height, in pixels

static pix_fmt(videos: pa.ChunkedArray) pa.Array#

The pixel format of the video, e.g., ‘yuv420p’

static split_by_duration(videos: pa.ChunkedArray, segment_duration: float = 10, min_segment_duration: float = 0) pa.Array#

Split video by duration.

This processor splits video files into multiple segments based on a fixed time length (segment_duration). For each video in the input, it outputs a list of binary data for the video segments. The default split points are keyframes. It supports various file systems including COS, S3, HTTP, binary sources, and other fsspec-compatible storage systems.

Parameters:
  • videos – The videos to be processed.

  • segment_duration – Target duration for each segment in seconds, default is 10s

  • min_segment_duration – Minimum duration for each segment, segments shorter than this value will be discarded, used for handling overly short segments at the end of videos, default value is 0

Examples

from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute
from xpark.dataset.context import DatasetContext

ctx = DatasetContext.get_current()
# set cos storage_options
# ctx.storage_options = {"cos": {"endpoint_url": "https://your-cos-endpoint"}}
# VIDEO_COS_PATH is cos path like cos://bucket/path/to/video.mp4

ds = from_items([{"video": VIDEO_COS_PATH}])
ds = ds.with_column(
    "split_videos",
    VideoCompute.split_by_duration
    .options(num_workers={"CPU": 1}, batch_size=1)
    .with_column(col("video")),
)

split_videos = ds.take(1)[0]['split_videos']
static split_by_key_frame(videos: pa.ChunkedArray) pa.Array#

Split video by keyframe

This function splits a video into segments based on keyframes, which are frames in a video stream that contain a complete image. Unlike other frames, keyframes (also known as I-frames) do not rely on previous frames for decoding and can be used as reference points to extract or seek specific video segments. Keyframes are crucial for tasks like video editing, seeking, or streaming, as they represent points where the video can be independently decoded.

It supports various file systems including COS, S3, HTTP, binary sources, and other fsspec-compatible storage systems.

Parameters:

videos – The videos to be processed.

Examples

from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute
from xpark.dataset.context import DatasetContext

ctx = DatasetContext.get_current()
# set cos storage_options
# ctx.storage_options = {"cos": {"endpoint_url": "https://your-cos-endpoint"}}
# VIDEO_COS_PATH is cos path like cos://bucket/path/to/video.mp4

ds = from_items([{"video": VIDEO_COS_PATH}])
ds = ds.with_column(
    "split_videos",
    VideoCompute.split_by_key_frame
    .options(num_workers={"CPU": 1}, batch_size=1)
    .with_column(col("video")),
)

split_videos = ds.take(1)[0]['split_videos']
static time_base(videos: pa.ChunkedArray) pa.Array#

The time base of the stream, representing the unit of time for timestamps

static width(videos: pa.ChunkedArray) pa.Array#

Video width, in pixels