xpark.dataset.VideoCompute#
- class xpark.dataset.VideoCompute(*args, **kwargs)[source]#
Note
Do not construct this class, use the staticmethod instead.
Methods
average_rate(videos)The average framerate of the video, as a float
base_rate(videos)The fundamental framerate of the stream, as a float
bit_rate(videos)The average bitrate of the video stream, in bits per second
codec(videos)The name of the codec used for the video stream, e.g., 'h264', 'vp9'
decode_frame_at(videos, timestamp[, tolerance_s])Decode a video frame at specified timestamp.
decode_frames(videos[, timestamps, fps, ...])Extract frames from video with multiple modes.
display_aspect_ratio(videos)The display aspect ratio (DAR) of the video, e.g., 16:9 is 1.777...
extract_audio(videos[, codec, sample_rate, ...])Extract audio from video.
height(videos)Video height, in pixels
pix_fmt(videos)The pixel format of the video, e.g., 'yuv420p'
split_by_duration(videos[, ...])Split video by duration.
split_by_key_frame(videos)Split video by keyframe
time_base(videos)The time base of the stream, representing the unit of time for timestamps
width(videos)Video width, in pixels
- static average_rate(videos: pa.ChunkedArray) pa.Array#
The average framerate of the video, as a float
- static base_rate(videos: pa.ChunkedArray) pa.Array#
The fundamental framerate of the stream, as a float
- static bit_rate(videos: pa.ChunkedArray) pa.Array#
The average bitrate of the video stream, in bits per second
- static codec(videos: pa.ChunkedArray) pa.Array#
The name of the codec used for the video stream, e.g., ‘h264’, ‘vp9’
- static decode_frame_at(videos: pa.ChunkedArray, timestamp: pa.ChunkedArray, tolerance_s: float = 0.1) pa.Array#
Decode a video frame at specified timestamp.
This processor decodes a video frame from video file at the given timestamp and returns it as a tensor array. It uses PyAV (ffmpeg wrapper) for decoding and supports various file systems including local, COS, S3, HuggingFace Hub (hf://), and other fsspec-compatible storage systems.
- Parameters:
videos – Column of video file paths (strings) or binary data. Supports local paths and remote paths (cos://, s3://, hf://, etc.).
timestamp – Column of timestamps in seconds (floats) at which to extract a frame from the corresponding video.
tolerance_s – Allowed deviation in seconds for frame retrieval. If the exact timestamp is not available, the closest frame within this tolerance will be returned. Default is 0.1 seconds.
- Returns:
ArrowTensorArray of decoded frame with shape (C, H, W), dtype float32, values in range [0, 1].
Examples
Basic usage:
from xpark.dataset import from_items from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute ds = from_items([ {"video": "video.mp4", "timestamp": 0.5}, {"video": "video.mp4", "timestamp": 1.0}, ]) # Decode video frame at specified timestamp ds = ds.with_column( "frame", VideoCompute.decode_frame_at(col("video"), col("timestamp")), ) # Get decoded frames frames = ds.take(2)
With custom options:
ds = ds.with_column( "frame", VideoCompute.decode_frame_at .options(num_workers={"CPU": 2}, batch_size=16) .with_column( col("video"), col("timestamp"), tolerance_s=0.05, ), )
- static decode_frames(videos: pa.ChunkedArray, timestamps: pa.ChunkedArray | None = None, *, fps: float | None = None, keyframes_only: bool = False, num_frames: int | None = None, max_frames: int | None = None, start_time: float | None = None, end_time: float | None = None, tolerance_s: float = 0.1) pa.Array#
Extract frames from video with multiple modes.
This processor extracts video frames with flexible extraction modes and supports both path (string) and binary (bytes) input. It returns frames as tensor arrays.
Extraction modes (mutually exclusive, in priority order): 1. timestamps: Extract frames at specific timestamps (requires timestamps column) 2. fps: Extract frames at a fixed frame rate 3. keyframes_only: Extract only keyframes (I-frames) 4. num_frames: Extract N frames uniformly distributed
- Parameters:
videos – Column of video file paths (strings) or binary data (bytes). Supports local paths and remote paths (cos://, s3://, hf://, etc.).
timestamps – Optional column of timestamps in seconds (floats) at which to extract frames. Can be a single float or a list of floats per video.
fps – Target frames per second to extract. For example, fps=2.0 extracts 2 frames per second of video.
keyframes_only – If True, extract only keyframes (I-frames). Keyframes are independently decodable frames, useful for scene detection.
num_frames – Number of frames to extract uniformly distributed across the video duration.
max_frames – Maximum number of frames to extract. Applies to fps and keyframes_only modes.
start_time – Start time in seconds for extraction range.
end_time – End time in seconds for extraction range.
tolerance_s – Allowed deviation in seconds for timestamp mode. Default is 0.1 seconds.
- Returns:
ArrowTensorArray of extracted frames with shape (N, C, H, W) per video, dtype float32, values in range [0, 1].
Examples
Extract frames at specific timestamps:
from xpark.dataset import from_items from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute ds = from_items([{"video": "video.mp4", "ts": [0.5, 1.0, 1.5]}]) ds = ds.with_column( "frames", VideoCompute.decode_frames(col("video"), col("ts")), )
Extract frames at 2 FPS:
ds = ds.with_column( "frames", VideoCompute.decode_frames(col("video"), fps=2.0, max_frames=10), )
Extract keyframes only:
ds = ds.with_column( "keyframes", VideoCompute.decode_frames(col("video"), keyframes_only=True), )
Extract 10 frames uniformly:
ds = ds.with_column( "frames", VideoCompute.decode_frames(col("video"), num_frames=10), )
Extract from binary video data:
ds = xd.from_items([{"video_bytes": video_binary}]) ds = ds.with_column( "frames", VideoCompute.decode_frames(col("video_bytes"), num_frames=5), )
See also
decode_frame_at(): Simpler API for single frame extraction.data-juicer VideoExtractFramesMapper for similar functionality.
- static display_aspect_ratio(videos: pa.ChunkedArray) pa.Array#
The display aspect ratio (DAR) of the video, e.g., 16:9 is 1.777…
- static extract_audio(videos: pa.ChunkedArray, codec: str | None = None, sample_rate: int | None = None, stream_index: int | None = None, start_second: float = 0, end_second: float | None = None) pa.Array#
Extract audio from video.
This processor extracts audio data from video files and returns the audio binary data along with its file extension. It supports various file systems including COS, S3, HTTP, binary sources, and other fsspec-compatible storage systems.
- Parameters:
videos – The videos to be processed.
codec – Output audio format.
sample_rate – Output audio sample rate.
stream_index – Index of the audio stream to extract.
start_second – Start time of the audio to extract.
end_second – End time of the audio to extract.
Examples
from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute from xpark.dataset.context import DatasetContext ctx = DatasetContext.get_current() # set cos storage_options # ctx.storage_options = {"cos": {"endpoint_url": "https://your-cos-endpoint"}} # VIDEO_COS_PATH is cos path like cos://bucket/path/to/video.mp4 ds = from_items([{"video": VIDEO_COS_PATH}]) ds = ds.with_column( "audio", VideoCompute.extract_audio .options(num_workers={"CPU": 1}, batch_size=1) .with_column(col("video")), ) audio_bytes = ds.take(1)[0]['audio']
- static height(videos: pa.ChunkedArray) pa.Array#
Video height, in pixels
- static pix_fmt(videos: pa.ChunkedArray) pa.Array#
The pixel format of the video, e.g., ‘yuv420p’
- static split_by_duration(videos: pa.ChunkedArray, segment_duration: float = 10, min_segment_duration: float = 0) pa.Array#
Split video by duration.
This processor splits video files into multiple segments based on a fixed time length (segment_duration). For each video in the input, it outputs a list of binary data for the video segments. The default split points are keyframes. It supports various file systems including COS, S3, HTTP, binary sources, and other fsspec-compatible storage systems.
- Parameters:
videos – The videos to be processed.
segment_duration – Target duration for each segment in seconds, default is 10s
min_segment_duration – Minimum duration for each segment, segments shorter than this value will be discarded, used for handling overly short segments at the end of videos, default value is 0
Examples
from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute from xpark.dataset.context import DatasetContext ctx = DatasetContext.get_current() # set cos storage_options # ctx.storage_options = {"cos": {"endpoint_url": "https://your-cos-endpoint"}} # VIDEO_COS_PATH is cos path like cos://bucket/path/to/video.mp4 ds = from_items([{"video": VIDEO_COS_PATH}]) ds = ds.with_column( "split_videos", VideoCompute.split_by_duration .options(num_workers={"CPU": 1}, batch_size=1) .with_column(col("video")), ) split_videos = ds.take(1)[0]['split_videos']
- static split_by_key_frame(videos: pa.ChunkedArray) pa.Array#
Split video by keyframe
This function splits a video into segments based on keyframes, which are frames in a video stream that contain a complete image. Unlike other frames, keyframes (also known as I-frames) do not rely on previous frames for decoding and can be used as reference points to extract or seek specific video segments. Keyframes are crucial for tasks like video editing, seeking, or streaming, as they represent points where the video can be independently decoded.
It supports various file systems including COS, S3, HTTP, binary sources, and other fsspec-compatible storage systems.
- Parameters:
videos – The videos to be processed.
Examples
from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute from xpark.dataset.context import DatasetContext ctx = DatasetContext.get_current() # set cos storage_options # ctx.storage_options = {"cos": {"endpoint_url": "https://your-cos-endpoint"}} # VIDEO_COS_PATH is cos path like cos://bucket/path/to/video.mp4 ds = from_items([{"video": VIDEO_COS_PATH}]) ds = ds.with_column( "split_videos", VideoCompute.split_by_key_frame .options(num_workers={"CPU": 1}, batch_size=1) .with_column(col("video")), ) split_videos = ds.take(1)[0]['split_videos']
- static time_base(videos: pa.ChunkedArray) pa.Array#
The time base of the stream, representing the unit of time for timestamps
- static width(videos: pa.ChunkedArray) pa.Array#
Video width, in pixels