mediapy

mediapy: Read/write/show images and videos in an IPython/Jupyter notebook.

[GitHub source]   [API docs]   [PyPI package]   [Colab example]

See the example notebook, or better yet, open it in Colab.

Image examples

Display an image (2D or 3D numpy array):

checkerboard = np.kron([[0, 1] * 16, [1, 0] * 16] * 16, np.ones((4, 4)))
show_image(checkerboard)

Read and display an image (either local or from the Web):

IMAGE = 'https://github.com/hhoppe/data/raw/main/image.png'
show_image(read_image(IMAGE))

Read and display an image from a local file:

!wget -q -O /tmp/burano.png {IMAGE}
show_image(read_image('/tmp/burano.png'))

Show titled images side-by-side:

images = {
    'original': checkerboard,
    'darkened': checkerboard * 0.7,
    'random': np.random.rand(32, 32, 3),
}
show_images(images, vmin=0.0, vmax=1.0, border=True, height=64)

Compare two images using an interactive slider:

compare_images([checkerboard, np.random.rand(128, 128, 3)])

Video examples

Display a video (an iterable of images, e.g., a 3D or 4D array):

video = moving_circle((100, 100), num_images=10)
show_video(video, fps=10)

Show the video frames side-by-side:

show_images(video, columns=6, border=True, height=64)

Show the frames with their indices:

show_images({f'{i}': image for i, image in enumerate(video)}, width=32)

Read and display a video (either local or from the Web):

VIDEO = 'https://github.com/hhoppe/data/raw/main/video.mp4'
show_video(read_video(VIDEO))

Create and display a looping two-frame GIF video:

image1 = resize_image(np.random.rand(10, 10, 3), (50, 50))
show_video([image1, image1 * 0.8], fps=2, codec='gif')

Darken a video frame-by-frame:

output_path = '/tmp/out.mp4'
with VideoReader(VIDEO) as r:
  darken_image = lambda image: to_float01(image) * 0.5
  with VideoWriter(output_path, shape=r.shape, fps=r.fps, bps=r.bps) as w:
    for image in r:
      w.add_image(darken_image(image))
   1# Copyright 2025 The mediapy Authors.
   2#
   3# Licensed under the Apache License, Version 2.0 (the "License");
   4# you may not use this file except in compliance with the License.
   5# You may obtain a copy of the License at
   6#
   7#     http://www.apache.org/licenses/LICENSE-2.0
   8#
   9# Unless required by applicable law or agreed to in writing, software
  10# distributed under the License is distributed on an "AS IS" BASIS,
  11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12# See the License for the specific language governing permissions and
  13# limitations under the License.
  14
  15"""`mediapy`: Read/write/show images and videos in an IPython/Jupyter notebook.
  16
  17[**[GitHub source]**](https://github.com/google/mediapy)  
  18[**[API docs]**](https://google.github.io/mediapy/)  
  19[**[PyPI package]**](https://pypi.org/project/mediapy/)  
  20[**[Colab
  21example]**](https://colab.research.google.com/github/google/mediapy/blob/main/mediapy_examples.ipynb)
  22
  23See the [example
  24notebook](https://github.com/google/mediapy/blob/main/mediapy_examples.ipynb),
  25or better yet, [**open it in
  26Colab**](https://colab.research.google.com/github/google/mediapy/blob/main/mediapy_examples.ipynb).
  27
  28## Image examples
  29
  30Display an image (2D or 3D `numpy` array):
  31```python
  32checkerboard = np.kron([[0, 1] * 16, [1, 0] * 16] * 16, np.ones((4, 4)))
  33show_image(checkerboard)
  34```
  35
  36Read and display an image (either local or from the Web):
  37```python
  38IMAGE = 'https://github.com/hhoppe/data/raw/main/image.png'
  39show_image(read_image(IMAGE))
  40```
  41
  42Read and display an image from a local file:
  43```python
  44!wget -q -O /tmp/burano.png {IMAGE}
  45show_image(read_image('/tmp/burano.png'))
  46```
  47
  48Show titled images side-by-side:
  49```python
  50images = {
  51    'original': checkerboard,
  52    'darkened': checkerboard * 0.7,
  53    'random': np.random.rand(32, 32, 3),
  54}
  55show_images(images, vmin=0.0, vmax=1.0, border=True, height=64)
  56```
  57
  58Compare two images using an interactive slider:
  59```python
  60compare_images([checkerboard, np.random.rand(128, 128, 3)])
  61```
  62
  63## Video examples
  64
  65Display a video (an iterable of images, e.g., a 3D or 4D array):
  66```python
  67video = moving_circle((100, 100), num_images=10)
  68show_video(video, fps=10)
  69```
  70
  71Show the video frames side-by-side:
  72```python
  73show_images(video, columns=6, border=True, height=64)
  74```
  75
  76Show the frames with their indices:
  77```python
  78show_images({f'{i}': image for i, image in enumerate(video)}, width=32)
  79```
  80
  81Read and display a video (either local or from the Web):
  82```python
  83VIDEO = 'https://github.com/hhoppe/data/raw/main/video.mp4'
  84show_video(read_video(VIDEO))
  85```
  86
  87Create and display a looping two-frame GIF video:
  88```python
  89image1 = resize_image(np.random.rand(10, 10, 3), (50, 50))
  90show_video([image1, image1 * 0.8], fps=2, codec='gif')
  91```
  92
  93Darken a video frame-by-frame:
  94```python
  95output_path = '/tmp/out.mp4'
  96with VideoReader(VIDEO) as r:
  97  darken_image = lambda image: to_float01(image) * 0.5
  98  with VideoWriter(output_path, shape=r.shape, fps=r.fps, bps=r.bps) as w:
  99    for image in r:
 100      w.add_image(darken_image(image))
 101```
 102"""
 103
 104from __future__ import annotations
 105
 106__docformat__ = 'google'
 107__version__ = '1.2.4'
 108__version_info__ = tuple(int(num) for num in __version__.split('.'))
 109
 110import base64
 111from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
 112import contextlib
 113import functools
 114import importlib
 115import io
 116import itertools
 117import math
 118import numbers
 119import os  # Package only needed for typing.TYPE_CHECKING.
 120import pathlib
 121import re
 122import shlex
 123import shutil
 124import subprocess
 125import sys
 126import tempfile
 127import typing
 128from typing import Any
 129import urllib.request
 130import warnings
 131
 132import IPython.display
 133import matplotlib.pyplot
 134import numpy as np
 135import numpy.typing as npt
 136import PIL.Image
 137import PIL.ImageOps
 138
 139
 140if not hasattr(PIL.Image, 'Resampling'):  # Allow Pillow<9.0.
 141  PIL.Image.Resampling = PIL.Image  # type: ignore
 142
 143# Selected and reordered here for pdoc documentation.
 144__all__ = [
 145    'show_image',
 146    'show_images',
 147    'compare_images',
 148    'show_video',
 149    'show_videos',
 150    'read_image',
 151    'write_image',
 152    'read_video',
 153    'write_video',
 154    'VideoReader',
 155    'VideoWriter',
 156    'VideoMetadata',
 157    'compress_image',
 158    'decompress_image',
 159    'compress_video',
 160    'decompress_video',
 161    'html_from_compressed_image',
 162    'html_from_compressed_video',
 163    'resize_image',
 164    'resize_video',
 165    'to_rgb',
 166    'to_type',
 167    'to_float01',
 168    'to_uint8',
 169    'set_output_height',
 170    'set_max_output_height',
 171    'color_ramp',
 172    'moving_circle',
 173    'set_show_save_dir',
 174    'set_ffmpeg',
 175    'video_is_available',
 176]
 177
 178if TYPE_CHECKING:
 179  _ArrayLike = npt.ArrayLike
 180  _DTypeLike = npt.DTypeLike
 181  _NDArray = npt.NDArray[Any]
 182  _DType = np.dtype[Any]
 183else:
 184  # Create named types for use in the `pdoc` documentation.
 185  _ArrayLike = TypeVar('_ArrayLike')
 186  _DTypeLike = TypeVar('_DTypeLike')
 187  _NDArray = TypeVar('_NDArray')
 188  _DType = TypeVar('_DType')  # pylint: disable=invalid-name
 189
 190_IPYTHON_HTML_SIZE_LIMIT = 20_000_000
 191_T = TypeVar('_T')
 192_Path = Union[str, 'os.PathLike[str]']
 193
 194_IMAGE_COMPARISON_HTML = """\
 195<script
 196  defer
 197  src="https://unpkg.com/img-comparison-slider@7/dist/index.js"
 198></script>
 199<link
 200  rel="stylesheet"
 201  href="https://unpkg.com/img-comparison-slider@7/dist/styles.css"
 202/>
 203
 204<img-comparison-slider>
 205  <img slot="first" src="data:image/png;base64,{b64_1}" />
 206  <img slot="second" src="data:image/png;base64,{b64_2}" />
 207</img-comparison-slider>
 208"""
 209
 210# ** Miscellaneous.
 211
 212
 213class _Config:
 214  ffmpeg_name_or_path: _Path = 'ffmpeg'
 215  show_save_dir: _Path | None = None
 216
 217
 218_config = _Config()
 219
 220
 221def _open(path: _Path, *args: Any, **kwargs: Any) -> Any:
 222  """Opens the file; this is a hook for the built-in `open()`."""
 223  return open(path, *args, **kwargs)
 224
 225
 226def _path_is_local(path: _Path) -> bool:
 227  """Returns True if the path is in the filesystem accessible by `ffmpeg`."""
 228  del path
 229  return True
 230
 231
 232def _search_for_ffmpeg_path() -> str | None:
 233  """Returns a path to the ffmpeg program, or None if not found."""
 234  if filename := shutil.which(_config.ffmpeg_name_or_path):
 235    return str(filename)
 236  return None
 237
 238
 239def _print_err(*args: str, **kwargs: Any) -> None:
 240  """Prints arguments to stderr immediately."""
 241  kwargs = {**dict(file=sys.stderr, flush=True), **kwargs}
 242  print(*args, **kwargs)
 243
 244
 245def _chunked(
 246    iterable: Iterable[_T], n: int | None = None
 247) -> Iterator[tuple[_T, ...]]:
 248  """Returns elements collected as tuples of length at most `n` if not None."""
 249
 250  def take(n: int | None, iterable: Iterable[_T]) -> tuple[_T, ...]:
 251    return tuple(itertools.islice(iterable, n))
 252
 253  return iter(functools.partial(take, n, iter(iterable)), ())
 254
 255
 256def _peek_first(iterator: Iterable[_T]) -> tuple[_T, Iterable[_T]]:
 257  """Given an iterator, returns first element and re-initialized iterator.
 258
 259  >>> first_image, images = _peek_first(moving_circle())
 260
 261  Args:
 262    iterator: An input iterator or iterable.
 263
 264  Returns:
 265    A tuple (first_element, iterator_reinitialized) containing:
 266      first_element: The first element of the input.
 267      iterator_reinitialized: A clone of the original iterator/iterable.
 268  """
 269  # Inspired from https://stackoverflow.com/a/12059829/1190077
 270  peeker, iterator_reinitialized = itertools.tee(iterator)
 271  first = next(peeker)
 272  return first, iterator_reinitialized
 273
 274
 275def _check_2d_shape(shape: tuple[int, int]) -> None:
 276  """Checks that `shape` is of the form (height, width) with two integers."""
 277  if len(shape) != 2:
 278    raise ValueError(f'Shape {shape} is not of the form (height, width).')
 279  if not all(isinstance(i, numbers.Integral) for i in shape):
 280    raise ValueError(f'Shape {shape} contains non-integers.')
 281
 282
 283def _run(args: str | Sequence[str]) -> None:
 284  """Executes command, printing output from stdout and stderr.
 285
 286  Args:
 287    args: Command to execute, which can be either a string or a sequence of word
 288      strings, as in `subprocess.run()`.  If `args` is a string, the shell is
 289      invoked to interpret it.
 290
 291  Raises:
 292    RuntimeError: If the command's exit code is nonzero.
 293  """
 294  proc = subprocess.run(
 295      args,
 296      shell=isinstance(args, str),
 297      stdout=subprocess.PIPE,
 298      stderr=subprocess.STDOUT,
 299      check=False,
 300      universal_newlines=True,
 301  )
 302  print(proc.stdout, end='', flush=True)
 303  if proc.returncode:
 304    raise RuntimeError(
 305        f"Command '{proc.args}' failed with code {proc.returncode}."
 306    )
 307
 308
 309def _display_html(text: str, /) -> None:
 310  """In a Jupyter notebook, display the HTML `text`."""
 311  IPython.display.display(IPython.display.HTML(text))  # type: ignore
 312
 313
 314def set_ffmpeg(name_or_path: _Path) -> None:
 315  """Specifies the name or path for the `ffmpeg` external program.
 316
 317  The `ffmpeg` program is required for compressing and decompressing video.
 318  (It is used in `read_video`, `write_video`, `show_video`, `show_videos`,
 319  etc.)
 320
 321  Args:
 322    name_or_path: Either a filename within a directory of `os.environ['PATH']`
 323      or a filepath.  The default setting is 'ffmpeg'.
 324  """
 325  _config.ffmpeg_name_or_path = name_or_path
 326
 327
 328def set_output_height(num_pixels: int) -> None:
 329  """Overrides the height of the current output cell, if using Colab."""
 330  try:
 331    # We want to fail gracefully for non-Colab IPython notebooks.
 332    output = importlib.import_module('google.colab.output')
 333    s = f'google.colab.output.setIframeHeight("{num_pixels}px")'
 334    output.eval_js(s)
 335  except (ModuleNotFoundError, AttributeError):
 336    pass
 337
 338
 339def set_max_output_height(num_pixels: int) -> None:
 340  """Sets the maximum height of the current output cell, if using Colab."""
 341  try:
 342    # We want to fail gracefully for non-Colab IPython notebooks.
 343    output = importlib.import_module('google.colab.output')
 344    s = (
 345        'google.colab.output.setIframeHeight('
 346        f'0, true, {{maxHeight: {num_pixels}}})'
 347    )
 348    output.eval_js(s)
 349  except (ModuleNotFoundError, AttributeError):
 350    pass
 351
 352
 353# ** Type conversions.
 354
 355
 356def _as_valid_media_type(dtype: _DTypeLike) -> _DType:
 357  """Returns validated media data type."""
 358  dtype = np.dtype(dtype)
 359  if not issubclass(dtype.type, (np.unsignedinteger, np.floating)):
 360    raise ValueError(
 361        f'Type {dtype} is not a valid media data type (uint or float).'
 362    )
 363  return dtype
 364
 365
 366def _as_valid_media_array(x: _ArrayLike) -> _NDArray:
 367  """Converts to ndarray (if not already), and checks validity of data type."""
 368  a = np.asarray(x)
 369  if a.dtype == bool:
 370    a = a.astype(np.uint8) * np.iinfo(np.uint8).max
 371  _as_valid_media_type(a.dtype)
 372  return a
 373
 374
 375def to_type(array: _ArrayLike, dtype: _DTypeLike) -> _NDArray:
 376  """Returns media array converted to specified type.
 377
 378  A "media array" is one in which the dtype is either a floating-point type
 379  (np.float32 or np.float64) or an unsigned integer type.  The array values are
 380  assumed to lie in the range [0.0, 1.0] for floating-point values, and in the
 381  full range for unsigned integers, e.g. [0, 255] for np.uint8.
 382
 383  Conversion between integers and floats maps uint(0) to 0.0 and uint(MAX) to
 384  1.0.  The input array may also be of type bool, whereby True maps to
 385  uint(MAX) or 1.0.  The values are scaled and clamped as appropriate during
 386  type conversions.
 387
 388  Args:
 389    array: Input array-like object (floating-point, unsigned int, or bool).
 390    dtype: Desired output type (floating-point or unsigned int).
 391
 392  Returns:
 393    Array `a` if it is already of the specified dtype, else a converted array.
 394  """
 395  a = np.asarray(array)
 396  dtype = np.dtype(dtype)
 397  del array
 398  if a.dtype != bool:
 399    _as_valid_media_type(a.dtype)  # Verify that 'a' has a valid dtype.
 400  if a.dtype == bool:
 401    result = a.astype(dtype)
 402    if np.issubdtype(dtype, np.unsignedinteger):
 403      result = result * dtype.type(np.iinfo(dtype).max)
 404  elif a.dtype == dtype:
 405    result = a
 406  elif np.issubdtype(dtype, np.unsignedinteger):
 407    if np.issubdtype(a.dtype, np.unsignedinteger):
 408      src_max: float = np.iinfo(a.dtype).max
 409    else:
 410      a = np.clip(a, 0.0, 1.0)
 411      src_max = 1.0
 412    dst_max = np.iinfo(dtype).max
 413    if dst_max <= np.iinfo(np.uint16).max:
 414      scale = np.array(dst_max / src_max, dtype=np.float32)
 415      result = (a * scale + 0.5).astype(dtype)
 416    elif dst_max <= np.iinfo(np.uint32).max:
 417      result = (a.astype(np.float64) * (dst_max / src_max) + 0.5).astype(dtype)
 418    else:
 419      # https://stackoverflow.com/a/66306123/
 420      a = a.astype(np.float64) * (dst_max / src_max) + 0.5
 421      dst = np.atleast_1d(a)
 422      values_too_large = dst >= np.float64(dst_max)
 423      with np.errstate(invalid='ignore'):
 424        dst = dst.astype(dtype)
 425      dst[values_too_large] = dst_max
 426      result = dst if a.ndim > 0 else dst[0]
 427  else:
 428    assert np.issubdtype(dtype, np.floating)
 429    result = a.astype(dtype)
 430    if np.issubdtype(a.dtype, np.unsignedinteger):
 431      result = result / dtype.type(np.iinfo(a.dtype).max)
 432  return result
 433
 434
 435def to_float01(a: _ArrayLike, dtype: _DTypeLike = np.float32) -> _NDArray:
 436  """If array has unsigned integers, rescales them to the range [0.0, 1.0].
 437
 438  Scaling is such that uint(0) maps to 0.0 and uint(MAX) maps to 1.0.  See
 439  `to_type`.
 440
 441  Args:
 442    a: Input array.
 443    dtype: Desired floating-point type if rescaling occurs.
 444
 445  Returns:
 446    A new array of dtype values in the range [0.0, 1.0] if the input array `a`
 447    contains unsigned integers; otherwise, array `a` is returned unchanged.
 448  """
 449  a = np.asarray(a)
 450  dtype = np.dtype(dtype)
 451  if not np.issubdtype(dtype, np.floating):
 452    raise ValueError(f'Type {dtype} is not floating-point.')
 453  if np.issubdtype(a.dtype, np.floating):
 454    return a
 455  return to_type(a, dtype)
 456
 457
 458def to_uint8(a: _ArrayLike) -> _NDArray:
 459  """Returns array converted to uint8 values; see `to_type`."""
 460  return to_type(a, np.uint8)
 461
 462
 463# ** Functions to generate example image and video data.
 464
 465
 466def color_ramp(
 467    shape: tuple[int, int] = (64, 64), *, dtype: _DTypeLike = np.float32
 468) -> _NDArray:
 469  """Returns an image of a red-green color gradient.
 470
 471  This is useful for quick experimentation and testing.  See also
 472  `moving_circle` to generate a sample video.
 473
 474  Args:
 475    shape: 2D spatial dimensions (height, width) of generated image.
 476    dtype: Type (uint or floating) of resulting pixel values.
 477  """
 478  _check_2d_shape(shape)
 479  dtype = _as_valid_media_type(dtype)
 480  yx = (np.moveaxis(np.indices(shape), 0, -1) + 0.5) / shape
 481  image = np.insert(yx, 2, 0.0, axis=-1)
 482  return to_type(image, dtype)
 483
 484
 485def moving_circle(
 486    shape: tuple[int, int] = (256, 256),
 487    num_images: int = 10,
 488    *,
 489    dtype: _DTypeLike = np.float32,
 490) -> _NDArray:
 491  """Returns a video of a circle moving in front of a color ramp.
 492
 493  This is useful for quick experimentation and testing.  See also `color_ramp`
 494  to generate a sample image.
 495
 496  >>> show_video(moving_circle((480, 640), 60), fps=60)
 497
 498  Args:
 499    shape: 2D spatial dimensions (height, width) of generated video.
 500    num_images: Number of video frames.
 501    dtype: Type (uint or floating) of resulting pixel values.
 502  """
 503  _check_2d_shape(shape)
 504  dtype = np.dtype(dtype)
 505
 506  def generate_image(image_index: int) -> _NDArray:
 507    """Returns a video frame image."""
 508    image = color_ramp(shape, dtype=dtype)
 509    yx = np.moveaxis(np.indices(shape), 0, -1)
 510    center = shape[0] * 0.6, shape[1] * (image_index + 0.5) / num_images
 511    radius_squared = (min(shape) * 0.1) ** 2
 512    inside = np.sum((yx - center) ** 2, axis=-1) < radius_squared
 513    white_circle_color = 1.0, 1.0, 1.0
 514    if np.issubdtype(dtype, np.unsignedinteger):
 515      white_circle_color = to_type([white_circle_color], dtype)[0]
 516    image[inside] = white_circle_color
 517    return image
 518
 519  return np.array([generate_image(i) for i in range(num_images)])
 520
 521
 522# ** Color-space conversions.
 523
 524# Same matrix values as in two sources:
 525# https://github.com/scikit-image/scikit-image/blob/master/skimage/color/colorconv.py#L377
 526# https://github.com/tensorflow/tensorflow/blob/r1.14/tensorflow/python/ops/image_ops_impl.py#L2754
 527_YUV_FROM_RGB_MATRIX = np.array(
 528    [
 529        [0.299, -0.14714119, 0.61497538],
 530        [0.587, -0.28886916, -0.51496512],
 531        [0.114, 0.43601035, -0.10001026],
 532    ],
 533    dtype=np.float32,
 534)
 535_RGB_FROM_YUV_MATRIX = np.linalg.inv(_YUV_FROM_RGB_MATRIX)
 536_YUV_CHROMA_OFFSET = np.array([0.0, 0.5, 0.5], dtype=np.float32)
 537
 538
 539def yuv_from_rgb(rgb: _ArrayLike) -> _NDArray:
 540  """Returns the RGB image/video mapped to YUV [0,1] color space.
 541
 542  Note that the "YUV" color space used by video compressors is actually YCbCr!
 543
 544  Args:
 545    rgb: Input image in sRGB space.
 546  """
 547  rgb = to_float01(rgb)
 548  if rgb.shape[-1] != 3:
 549    raise ValueError(f'The last dimension in {rgb.shape} is not 3.')
 550  return rgb @ _YUV_FROM_RGB_MATRIX + _YUV_CHROMA_OFFSET
 551
 552
 553def rgb_from_yuv(yuv: _ArrayLike) -> _NDArray:
 554  """Returns the YUV image/video mapped to RGB [0,1] color space."""
 555  yuv = to_float01(yuv)
 556  if yuv.shape[-1] != 3:
 557    raise ValueError(f'The last dimension in {yuv.shape} is not 3.')
 558  return (yuv - _YUV_CHROMA_OFFSET) @ _RGB_FROM_YUV_MATRIX
 559
 560
 561# Same matrix values as in
 562# https://github.com/scikit-image/scikit-image/blob/master/skimage/color/colorconv.py#L1654
 563# and https://en.wikipedia.org/wiki/YUV#Studio_swing_for_BT.601
 564_YCBCR_FROM_RGB_MATRIX = np.array(
 565    [
 566        [65.481, 128.553, 24.966],
 567        [-37.797, -74.203, 112.0],
 568        [112.0, -93.786, -18.214],
 569    ],
 570    dtype=np.float32,
 571).transpose()
 572_RGB_FROM_YCBCR_MATRIX = np.linalg.inv(_YCBCR_FROM_RGB_MATRIX)
 573_YCBCR_OFFSET = np.array([16.0, 128.0, 128.0], dtype=np.float32)
 574# Note that _YCBCR_FROM_RGB_MATRIX =~ _YUV_FROM_RGB_MATRIX * [219, 256, 182];
 575# https://en.wikipedia.org/wiki/YUV: "Y' values are conventionally shifted and
 576# scaled to the range [16, 235] (referred to as studio swing or 'TV levels')";
 577# "studio range of 16-240 for U and V".  (Where does value 182 come from?)
 578
 579
 580def ycbcr_from_rgb(rgb: _ArrayLike) -> _NDArray:
 581  """Returns the RGB image/video mapped to YCbCr [0,1] color space.
 582
 583  The YCbCr color space is the one called "YUV" by video compressors.
 584
 585  Args:
 586    rgb: Input image in sRGB space.
 587  """
 588  rgb = to_float01(rgb)
 589  if rgb.shape[-1] != 3:
 590    raise ValueError(f'The last dimension in {rgb.shape} is not 3.')
 591  return (rgb @ _YCBCR_FROM_RGB_MATRIX + _YCBCR_OFFSET) / 255.0
 592
 593
 594def rgb_from_ycbcr(ycbcr: _ArrayLike) -> _NDArray:
 595  """Returns the YCbCr image/video mapped to RGB [0,1] color space."""
 596  ycbcr = to_float01(ycbcr)
 597  if ycbcr.shape[-1] != 3:
 598    raise ValueError(f'The last dimension in {ycbcr.shape} is not 3.')
 599  return (ycbcr * 255.0 - _YCBCR_OFFSET) @ _RGB_FROM_YCBCR_MATRIX
 600
 601
 602# ** Image processing.
 603
 604
 605def _pil_image(image: _ArrayLike, mode: str | None = None) -> PIL.Image.Image:
 606  """Returns a PIL image given a numpy matrix (either uint8 or float [0,1])."""
 607  image = _as_valid_media_array(image)
 608  if image.ndim not in (2, 3):
 609    raise ValueError(f'Image shape {image.shape} is neither 2D nor 3D.')
 610  pil_image: PIL.Image.Image = PIL.Image.fromarray(image, mode=mode)
 611  return pil_image
 612
 613
 614def resize_image(image: _ArrayLike, shape: tuple[int, int]) -> _NDArray:
 615  """Resizes image to specified spatial dimensions using a Lanczos filter.
 616
 617  Args:
 618    image: Array-like 2D or 3D object, where dtype is uint or floating-point.
 619    shape: 2D spatial dimensions (height, width) of output image.
 620
 621  Returns:
 622    A resampled image whose spatial dimensions match `shape`.
 623  """
 624  image = _as_valid_media_array(image)
 625  if image.ndim not in (2, 3):
 626    raise ValueError(f'Image shape {image.shape} is neither 2D nor 3D.')
 627  _check_2d_shape(shape)
 628
 629  # A PIL image can be multichannel only if it has 3 or 4 uint8 channels,
 630  # and it can be resized only if it is uint8 or float32.
 631  supported_single_channel = (
 632      np.issubdtype(image.dtype, np.floating) or image.dtype == np.uint8
 633  ) and image.ndim == 2
 634  supported_multichannel = (
 635      image.dtype == np.uint8 and image.ndim == 3 and image.shape[2] in (3, 4)
 636  )
 637  if supported_single_channel or supported_multichannel:
 638    return np.array(
 639        _pil_image(image).resize(
 640            shape[::-1], resample=PIL.Image.Resampling.LANCZOS
 641        ),
 642        dtype=image.dtype,
 643    )
 644  if image.ndim == 2:
 645    # We convert to floating-point for resizing and convert back.
 646    return to_type(resize_image(to_float01(image), shape), image.dtype)
 647  # We resize each image channel individually.
 648  return np.dstack(
 649      [resize_image(channel, shape) for channel in np.moveaxis(image, -1, 0)]
 650  )
 651
 652
 653# ** Video processing.
 654
 655
 656def resize_video(video: Iterable[_NDArray], shape: tuple[int, int]) -> _NDArray:
 657  """Resizes `video` to specified spatial dimensions using a Lanczos filter.
 658
 659  Args:
 660    video: Iterable of images.
 661    shape: 2D spatial dimensions (height, width) of output video.
 662
 663  Returns:
 664    A resampled video whose spatial dimensions match `shape`.
 665  """
 666  _check_2d_shape(shape)
 667  return np.array([resize_image(image, shape) for image in video])
 668
 669
 670# ** General I/O.
 671
 672
 673def _is_url(path_or_url: _Path) -> bool:
 674  return isinstance(path_or_url, str) and path_or_url.startswith(
 675      ('http://', 'https://', 'file://')
 676  )
 677
 678
 679def read_contents(path_or_url: _Path) -> bytes:
 680  """Returns the contents of the file specified by either a path or URL."""
 681  data: bytes
 682  if _is_url(path_or_url):
 683    assert isinstance(path_or_url, str)
 684    with urllib.request.urlopen(path_or_url) as response:
 685      data = response.read()
 686  else:
 687    with _open(path_or_url, 'rb') as f:
 688      data = f.read()
 689  return data
 690
 691
 692@contextlib.contextmanager
 693def _read_via_local_file(path_or_url: _Path) -> Iterator[str]:
 694  """Context to copy a remote file locally to read from it.
 695
 696  Args:
 697    path_or_url: File, which may be remote.
 698
 699  Yields:
 700    The name of a local file which may be a copy of a remote file.
 701  """
 702  if _is_url(path_or_url) or not _path_is_local(path_or_url):
 703    suffix = pathlib.Path(path_or_url).suffix
 704    with tempfile.TemporaryDirectory() as directory_name:
 705      tmp_path = pathlib.Path(directory_name) / f'file{suffix}'
 706      tmp_path.write_bytes(read_contents(path_or_url))
 707      yield str(tmp_path)
 708  else:
 709    yield str(path_or_url)
 710
 711
 712@contextlib.contextmanager
 713def _write_via_local_file(path: _Path) -> Iterator[str]:
 714  """Context to write a temporary local file and subsequently copy it remotely.
 715
 716  Args:
 717    path: File, which may be remote.
 718
 719  Yields:
 720    The name of a local file which may be subsequently copied remotely.
 721  """
 722  if _path_is_local(path):
 723    yield str(path)
 724  else:
 725    suffix = pathlib.Path(path).suffix
 726    with tempfile.TemporaryDirectory() as directory_name:
 727      tmp_path = pathlib.Path(directory_name) / f'file{suffix}'
 728      yield str(tmp_path)
 729      with _open(path, mode='wb') as f:
 730        f.write(tmp_path.read_bytes())
 731
 732
 733class set_show_save_dir:  # pylint: disable=invalid-name
 734  """Save all titled output from `show_*()` calls into files.
 735
 736  If the specified `directory` is not None, all titled images and videos
 737  displayed by `show_image`, `show_images`, `show_video`, and `show_videos` are
 738  also saved as files within the directory.
 739
 740  It can be used either to set the state or as a context manager:
 741
 742  >>> set_show_save_dir('/tmp')
 743  >>> show_image(color_ramp(), title='image1')  # Creates /tmp/image1.png.
 744  >>> show_video(moving_circle(), title='video2')  # Creates /tmp/video2.mp4.
 745  >>> set_show_save_dir(None)
 746
 747  >>> with set_show_save_dir('/tmp'):
 748  ...   show_image(color_ramp(), title='image1')  # Creates /tmp/image1.png.
 749  ...   show_video(moving_circle(), title='video2')  # Creates /tmp/video2.mp4.
 750  """
 751
 752  def __init__(self, directory: _Path | None):
 753    self._old_show_save_dir = _config.show_save_dir
 754    _config.show_save_dir = directory
 755
 756  def __enter__(self) -> None:
 757    pass
 758
 759  def __exit__(self, *_: Any) -> None:
 760    _config.show_save_dir = self._old_show_save_dir
 761
 762
 763# ** Image I/O.
 764
 765
 766def read_image(
 767    path_or_url: _Path,
 768    *,
 769    apply_exif_transpose: bool = True,
 770    dtype: _DTypeLike = None,
 771) -> _NDArray:
 772  """Returns an image read from a file path or URL.
 773
 774  Decoding is performed using `PIL`, which supports `uint8` images with 1, 3,
 775  or 4 channels and `uint16` images with a single channel.
 776
 777  Args:
 778    path_or_url: Path of input file.
 779    apply_exif_transpose: If True, rotate image according to EXIF orientation.
 780    dtype: Data type of the returned array.  If None, `np.uint8` or `np.uint16`
 781      is inferred automatically.
 782  """
 783  data = read_contents(path_or_url)
 784  return decompress_image(data, dtype, apply_exif_transpose)
 785
 786
 787def write_image(
 788    path: _Path, image: _ArrayLike, fmt: str = 'png', **kwargs: Any
 789) -> None:
 790  """Writes an image to a file.
 791
 792  Encoding is performed using `PIL`, which supports `uint8` images with 1, 3,
 793  or 4 channels and `uint16` images with a single channel.
 794
 795  File format is explicitly provided by `fmt` and not inferred by `path`.
 796
 797  Args:
 798    path: Path of output file.
 799    image: Array-like object.  If its type is float, it is converted to np.uint8
 800      using `to_uint8` (thus clamping to the input to the range [0.0, 1.0]).
 801      Otherwise it must be np.uint8 or np.uint16.
 802    fmt: Desired compression encoding, e.g. 'png'.
 803    **kwargs: Additional parameters for `PIL.Image.save()`.
 804  """
 805  image = _as_valid_media_array(image)
 806  if np.issubdtype(image.dtype, np.floating):
 807    image = to_uint8(image)
 808  with _open(path, 'wb') as f:
 809    _pil_image(image).save(f, format=fmt, **kwargs)
 810
 811
 812def to_rgb(
 813    array: _ArrayLike,
 814    *,
 815    vmin: float | None = None,
 816    vmax: float | None = None,
 817    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
 818) -> _NDArray:
 819  """Maps scalar values to RGB using value bounds and a color map.
 820
 821  Args:
 822    array: Scalar values, with arbitrary shape.
 823    vmin: Explicit min value for remapping; if None, it is obtained as the
 824      minimum finite value of `array`.
 825    vmax: Explicit max value for remapping; if None, it is obtained as the
 826      maximum finite value of `array`.
 827    cmap: A `pyplot` color map or callable, to map from 1D value to 3D or 4D
 828      color.
 829
 830  Returns:
 831    A new array in which each element is affinely mapped from [vmin, vmax]
 832    to [0.0, 1.0] and then color-mapped.
 833  """
 834  a = _as_valid_media_array(array)
 835  del array
 836  # For future numpy version 1.7.0:
 837  # vmin = np.amin(a, where=np.isfinite(a)) if vmin is None else vmin
 838  # vmax = np.amax(a, where=np.isfinite(a)) if vmax is None else vmax
 839  vmin = np.amin(np.where(np.isfinite(a), a, np.inf)) if vmin is None else vmin
 840  vmax = np.amax(np.where(np.isfinite(a), a, -np.inf)) if vmax is None else vmax
 841  a = (a.astype('float') - vmin) / (vmax - vmin + np.finfo(float).eps)
 842  if isinstance(cmap, str):
 843    if hasattr(matplotlib, 'colormaps'):
 844      rgb_from_scalar: Any = matplotlib.colormaps[cmap]  # Newer version.
 845    else:
 846      rgb_from_scalar = matplotlib.pyplot.cm.get_cmap(cmap)  # pylint: disable=no-member
 847  else:
 848    rgb_from_scalar = cmap
 849  a = rgb_from_scalar(a)
 850  # If there is a fully opaque alpha channel, remove it.
 851  if a.shape[-1] == 4 and np.all(to_float01(a[..., 3])) == 1.0:
 852    a = a[..., :3]
 853  return a
 854
 855
 856def compress_image(
 857    image: _ArrayLike, *, fmt: str = 'png', **kwargs: Any
 858) -> bytes:
 859  """Returns a buffer containing a compressed image.
 860
 861  Args:
 862    image: Array in a format supported by `PIL`, e.g. np.uint8 or np.uint16.
 863    fmt: Desired compression encoding, e.g. 'png'.
 864    **kwargs: Options for `PIL.save()`, e.g. `optimize=True` for greater
 865      compression.
 866  """
 867  image = _as_valid_media_array(image)
 868  with io.BytesIO() as output:
 869    _pil_image(image).save(output, format=fmt, **kwargs)
 870    return output.getvalue()
 871
 872
 873def decompress_image(
 874    data: bytes, dtype: _DTypeLike = None, apply_exif_transpose: bool = True
 875) -> _NDArray:
 876  """Returns an image from a compressed data buffer.
 877
 878  Decoding is performed using `PIL`, which supports `uint8` images with 1, 3,
 879  or 4 channels and `uint16` images with a single channel.
 880
 881  Args:
 882    data: Buffer containing compressed image.
 883    dtype: Data type of the returned array.  If None, `np.uint8` or `np.uint16`
 884      is inferred automatically.
 885    apply_exif_transpose: If True, rotate image according to EXIF orientation.
 886  """
 887  pil_image: PIL.Image.Image = PIL.Image.open(io.BytesIO(data))
 888  if apply_exif_transpose:
 889    tmp_image = PIL.ImageOps.exif_transpose(pil_image)  # Future: in_place=True.
 890    assert tmp_image
 891    pil_image = tmp_image
 892  if dtype is None:
 893    dtype = np.uint16 if pil_image.mode.startswith('I') else np.uint8
 894  return np.array(pil_image, dtype=dtype)
 895
 896
 897def html_from_compressed_image(
 898    data: bytes,
 899    width: int,
 900    height: int,
 901    *,
 902    title: str | None = None,
 903    border: bool | str = False,
 904    pixelated: bool = True,
 905    fmt: str = 'png',
 906) -> str:
 907  """Returns an HTML string with an image tag containing encoded data.
 908
 909  Args:
 910    data: Compressed image bytes.
 911    width: Width of HTML image in pixels.
 912    height: Height of HTML image in pixels.
 913    title: Optional text shown centered above image.
 914    border: If `bool`, whether to place a black boundary around the image, or if
 915      `str`, the boundary CSS style.
 916    pixelated: If True, sets the CSS style to 'image-rendering: pixelated;'.
 917    fmt: Compression encoding.
 918  """
 919  b64 = base64.b64encode(data).decode('utf-8')
 920  if isinstance(border, str):
 921    border = f'{border}; '
 922  elif border:
 923    border = 'border:1px solid black; '
 924  else:
 925    border = ''
 926  s_pixelated = 'pixelated' if pixelated else 'auto'
 927  s = (
 928      f'<img width="{width}" height="{height}"'
 929      f' style="{border}image-rendering:{s_pixelated}; object-fit:cover;"'
 930      f' src="data:image/{fmt};base64,{b64}"/>'
 931  )
 932  if title is not None:
 933    s = f"""<div style="display:flex; align-items:left;">
 934      <div style="display:flex; flex-direction:column; align-items:center;">
 935      <div>{title}</div><div>{s}</div></div></div>"""
 936  return s
 937
 938
 939def _get_width_height(
 940    width: int | None, height: int | None, shape: tuple[int, int]
 941) -> tuple[int, int]:
 942  """Returns (width, height) given optional parameters and image shape."""
 943  assert len(shape) == 2, shape
 944  if width and height:
 945    return width, height
 946  if width and not height:
 947    return width, int(width * (shape[0] / shape[1]) + 0.5)
 948  if height and not width:
 949    return int(height * (shape[1] / shape[0]) + 0.5), height
 950  return shape[::-1]
 951
 952
 953def _ensure_mapped_to_rgb(
 954    image: _ArrayLike,
 955    *,
 956    vmin: float | None = None,
 957    vmax: float | None = None,
 958    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
 959) -> _NDArray:
 960  """Ensure image is mapped to RGB."""
 961  image = _as_valid_media_array(image)
 962  if not (image.ndim == 2 or (image.ndim == 3 and image.shape[2] in (1, 3, 4))):
 963    raise ValueError(
 964        f'Image with shape {image.shape} is neither a 2D array'
 965        ' nor a 3D array with 1, 3, or 4 channels.'
 966    )
 967  if image.ndim == 3 and image.shape[2] == 1:
 968    image = image[:, :, 0]
 969  if image.ndim == 2:
 970    image = to_rgb(image, vmin=vmin, vmax=vmax, cmap=cmap)
 971  return image
 972
 973
 974def show_image(
 975    image: _ArrayLike, *, title: str | None = None, **kwargs: Any
 976) -> str | None:
 977  """Displays an image in the notebook and optionally saves it to a file.
 978
 979  See `show_images`.
 980
 981  >>> show_image(np.random.rand(100, 100))
 982  >>> show_image(np.random.randint(0, 256, size=(80, 80, 3), dtype='uint8'))
 983  >>> show_image(np.random.rand(10, 10) - 0.5, cmap='bwr', height=100)
 984  >>> show_image(read_image('/tmp/image.png'))
 985  >>> url = 'https://github.com/hhoppe/data/raw/main/image.png'
 986  >>> show_image(read_image(url))
 987
 988  Args:
 989    image: 2D array-like, or 3D array-like with 1, 3, or 4 channels.
 990    title: Optional text shown centered above the image.
 991    **kwargs: See `show_images`.
 992
 993  Returns:
 994    html string if `return_html` is `True`.
 995  """
 996  return show_images([np.asarray(image)], [title], **kwargs)
 997
 998
 999def show_images(
1000    images: Iterable[_ArrayLike] | Mapping[str, _ArrayLike],
1001    titles: Iterable[str | None] | None = None,
1002    *,
1003    width: int | None = None,
1004    height: int | None = None,
1005    downsample: bool = True,
1006    columns: int | None = None,
1007    vmin: float | None = None,
1008    vmax: float | None = None,
1009    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
1010    border: bool | str = False,
1011    ylabel: str = '',
1012    html_class: str = 'show_images',
1013    pixelated: bool | None = None,
1014    return_html: bool = False,
1015) -> str | None:
1016  """Displays a row of images in the IPython/Jupyter notebook.
1017
1018  If a directory has been specified using `set_show_save_dir`, also saves each
1019  titled image to a file in that directory based on its title.
1020
1021  >>> image1, image2 = np.random.rand(64, 64, 3), color_ramp((64, 64))
1022  >>> show_images([image1, image2])
1023  >>> show_images({'random image': image1, 'color ramp': image2}, height=128)
1024  >>> show_images([image1, image2] * 5, columns=4, border=True)
1025
1026  Args:
1027    images: Iterable of images, or dictionary of `{title: image}`.  Each image
1028      must be either a 2D array or a 3D array with 1, 3, or 4 channels.
1029    titles: Optional strings shown above the corresponding images.
1030    width: Optional, overrides displayed width (in pixels).
1031    height: Optional, overrides displayed height (in pixels).
1032    downsample: If True, each image whose width or height is greater than the
1033      specified `width` or `height` is resampled to the display resolution. This
1034      improves antialiasing and reduces the size of the notebook.
1035    columns: Optional, maximum number of images per row.
1036    vmin: For single-channel image, explicit min value for display.
1037    vmax: For single-channel image, explicit max value for display.
1038    cmap: For single-channel image, `pyplot` color map or callable to map 1D to
1039      3D color.
1040    border: If `bool`, whether to place a black boundary around the image, or if
1041      `str`, the boundary CSS style.
1042    ylabel: Text (rotated by 90 degrees) shown on the left of each row.
1043    html_class: CSS class name used in definition of HTML element.
1044    pixelated: If True, sets the CSS style to 'image-rendering: pixelated;'; if
1045      False, sets 'image-rendering: auto'; if None, uses pixelated rendering
1046      only on images for which `width` or `height` introduces magnification.
1047    return_html: If `True` return the raw HTML `str` instead of displaying.
1048
1049  Returns:
1050    html string if `return_html` is `True`.
1051  """
1052  if isinstance(images, Mapping):
1053    if titles is not None:
1054      raise ValueError('Cannot have images dictionary and titles parameter.')
1055    list_titles, list_images = list(images.keys()), list(images.values())
1056  else:
1057    list_images = list(images)
1058    list_titles = [None] * len(list_images) if titles is None else list(titles)
1059    if len(list_images) != len(list_titles):
1060      raise ValueError(
1061          'Number of images does not match number of titles'
1062          f' ({len(list_images)} vs {len(list_titles)}).'
1063      )
1064
1065  list_images = [
1066      _ensure_mapped_to_rgb(image, vmin=vmin, vmax=vmax, cmap=cmap)
1067      for image in list_images
1068  ]
1069
1070  def maybe_downsample(image: _NDArray) -> _NDArray:
1071    shape: tuple[int, int] = image.shape[:2]
1072    w, h = _get_width_height(width, height, shape)
1073    if w < shape[1] or h < shape[0]:
1074      image = resize_image(image, (h, w))
1075    return image
1076
1077  if downsample:
1078    list_images = [maybe_downsample(image) for image in list_images]
1079  png_datas = [compress_image(to_uint8(image)) for image in list_images]
1080
1081  for title, png_data in zip(list_titles, png_datas):
1082    if title is not None and _config.show_save_dir:
1083      path = pathlib.Path(_config.show_save_dir) / f'{title}.png'
1084      with _open(path, mode='wb') as f:
1085        f.write(png_data)
1086
1087  def html_from_compressed_images() -> str:
1088    html_strings = []
1089    for image, title, png_data in zip(list_images, list_titles, png_datas):
1090      w, h = _get_width_height(width, height, image.shape[:2])
1091      magnified = h > image.shape[0] or w > image.shape[1]
1092      pixelated2 = pixelated if pixelated is not None else magnified
1093      html_strings.append(
1094          html_from_compressed_image(
1095              png_data, w, h, title=title, border=border, pixelated=pixelated2
1096          )
1097      )
1098    # Create single-row tables each with no more than 'columns' elements.
1099    table_strings = []
1100    for row_html_strings in _chunked(html_strings, columns):
1101      td = '<td style="padding:1px;">'
1102      s = ''.join(f'{td}{e}</td>' for e in row_html_strings)
1103      if ylabel:
1104        style = 'writing-mode:vertical-lr; transform:rotate(180deg);'
1105        s = f'{td}<span style="{style}">{ylabel}</span></td>' + s
1106      table_strings.append(
1107          f'<table class="{html_class}"'
1108          f' style="border-spacing:0px;"><tr>{s}</tr></table>'
1109      )
1110    return ''.join(table_strings)
1111
1112  s = html_from_compressed_images()
1113  while len(s) > _IPYTHON_HTML_SIZE_LIMIT * 0.5:
1114    list_images = [image[::2, ::2] for image in list_images]
1115    png_datas = [compress_image(to_uint8(image)) for image in list_images]
1116    s = html_from_compressed_images()
1117  if return_html:
1118    return s
1119  _display_html(s)
1120  return None
1121
1122
1123def compare_images(
1124    images: Iterable[_ArrayLike],
1125    *,
1126    vmin: float | None = None,
1127    vmax: float | None = None,
1128    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
1129) -> None:
1130  """Compare two images using an interactive slider.
1131
1132  Displays an HTML slider component to interactively swipe between two images.
1133  The slider functionality requires that the web browser have Internet access.
1134  See additional info in `https://github.com/sneas/img-comparison-slider`.
1135
1136  >>> image1, image2 = np.random.rand(64, 64, 3), color_ramp((64, 64))
1137  >>> compare_images([image1, image2])
1138
1139  Args:
1140    images: Iterable of images.  Each image must be either a 2D array or a 3D
1141      array with 1, 3, or 4 channels.  There must be exactly two images.
1142    vmin: For single-channel image, explicit min value for display.
1143    vmax: For single-channel image, explicit max value for display.
1144    cmap: For single-channel image, `pyplot` color map or callable to map 1D to
1145      3D color.
1146  """
1147  list_images = [
1148      _ensure_mapped_to_rgb(image, vmin=vmin, vmax=vmax, cmap=cmap)
1149      for image in images
1150  ]
1151  if len(list_images) != 2:
1152    raise ValueError('The number of images must be 2.')
1153  png_datas = [compress_image(to_uint8(image)) for image in list_images]
1154  b64_1, b64_2 = [
1155      base64.b64encode(png_data).decode('utf-8') for png_data in png_datas
1156  ]
1157  s = _IMAGE_COMPARISON_HTML.replace('{b64_1}', b64_1).replace('{b64_2}', b64_2)
1158  _display_html(s)
1159
1160
1161# ** Video I/O.
1162
1163
1164def _filename_suffix_from_codec(codec: str) -> str:
1165  if codec == 'gif':
1166    return '.gif'
1167  elif codec == 'vp9':
1168    return '.webm'
1169
1170  return '.mp4'
1171
1172
1173def _get_ffmpeg_path() -> str:
1174  path = _search_for_ffmpeg_path()
1175  if not path:
1176    raise RuntimeError(
1177        f"Program '{_config.ffmpeg_name_or_path}' is not found;"
1178        " perhaps install ffmpeg using 'apt install ffmpeg'."
1179    )
1180  return path
1181
1182
1183@typing.overload
1184def _run_ffmpeg(
1185    ffmpeg_args: Sequence[str],
1186    stdin: int | None = None,
1187    stdout: int | None = None,
1188    stderr: int | None = None,
1189    encoding: None = None,  # No encoding -> bytes
1190    allowed_input_files: Sequence[str] | None = None,
1191    allowed_output_files: Sequence[str] | None = None,
1192) -> subprocess.Popen[bytes]:
1193  ...
1194
1195
1196@typing.overload
1197def _run_ffmpeg(
1198    ffmpeg_args: Sequence[str],
1199    stdin: int | None = None,
1200    stdout: int | None = None,
1201    stderr: int | None = None,
1202    encoding: str = ...,  # Encoding -> str
1203    allowed_input_files: Sequence[str] | None = None,
1204    allowed_output_files: Sequence[str] | None = None,
1205) -> subprocess.Popen[str]:
1206  ...
1207
1208
1209# Only typing.override should have typing annotations
1210def _run_ffmpeg(
1211    ffmpeg_args,
1212    stdin=None,
1213    stdout=None,
1214    stderr=None,
1215    encoding=None,
1216    allowed_input_files=None,
1217    allowed_output_files=None,
1218):
1219  """Runs ffmpeg with the given args.
1220
1221  Args:
1222    ffmpeg_args: The args to pass to ffmpeg.
1223    stdin: Same as in `subprocess.Popen`.
1224    stdout: Same as in `subprocess.Popen`.
1225    stderr: Same as in `subprocess.Popen`.
1226    encoding: Same as in `subprocess.Popen`.
1227    allowed_input_files: The input files to allow for ffmpeg.
1228    allowed_output_files: The output files to allow for ffmpeg.
1229
1230  Returns:
1231    The subprocess.Popen object with running ffmpeg process.
1232  """
1233  argv = []
1234  env = {}
1235  ffmpeg_path = _get_ffmpeg_path()
1236
1237  # Allowed input and output files are not supported in open source.
1238  del allowed_input_files
1239  del allowed_output_files
1240
1241  argv.append(ffmpeg_path)
1242  argv.extend(ffmpeg_args)
1243
1244  return subprocess.Popen(
1245      argv,
1246      stdin=stdin,
1247      stdout=stdout,
1248      stderr=stderr,
1249      encoding=encoding,
1250      env=env,
1251  )
1252
1253
1254def video_is_available() -> bool:
1255  """Returns True if the program `ffmpeg` is found.
1256
1257  See also `set_ffmpeg`.
1258  """
1259  return _search_for_ffmpeg_path() is not None
1260
1261
1262class VideoMetadata(NamedTuple):
1263  """Represents the data stored in a video container header.
1264
1265  Attributes:
1266    num_images: Number of frames that is expected from the video stream.  This
1267      is estimated from the framerate and the duration stored in the video
1268      header, so it might be inexact.  We set the value to -1 if number of
1269      frames is not found in the header.
1270    shape: The dimensions (height, width) of each video frame.
1271    fps: The framerate in frames per second.
1272    bps: The estimated bitrate of the video stream in bits per second, retrieved
1273      from the video header.
1274  """
1275
1276  num_images: int
1277  shape: tuple[int, int]
1278  fps: float
1279  bps: int | None
1280
1281
1282def _get_video_metadata(path: _Path) -> VideoMetadata:
1283  """Returns attributes of video stored in the specified local file."""
1284  if not pathlib.Path(path).is_file():
1285    raise RuntimeError(f"Video file '{path}' is not found.")
1286
1287  command = [
1288      '-nostdin',
1289      '-i',
1290      str(path),
1291      '-acodec',
1292      'copy',
1293      # Necessary to get "frame= *(\d+)" using newer ffmpeg versions.
1294      # Previously, was `'-vcodec', 'copy'`
1295      '-vf',
1296      'select=1',
1297      '-vsync',
1298      '0',
1299      '-f',
1300      'null',
1301      '-',
1302  ]
1303  with _run_ffmpeg(
1304      command,
1305      allowed_input_files=[str(path)],
1306      stderr=subprocess.PIPE,
1307      encoding='utf-8',
1308  ) as proc:
1309    _, err = proc.communicate()
1310  bps = fps = num_images = width = height = rotation = None
1311  before_output_info = True
1312  for line in err.split('\n'):
1313    if line.startswith('Output '):
1314      before_output_info = False
1315    if match := re.search(r', bitrate: *([\d.]+) kb/s', line):
1316      bps = int(match.group(1)) * 1000
1317    if matches := re.findall(r'frame= *(\d+) ', line):
1318      num_images = int(matches[-1])
1319    if 'Stream #0:' in line and ': Video:' in line and before_output_info:
1320      if not (match := re.search(r', (\d+)x(\d+)', line)):
1321        raise RuntimeError(f'Unable to parse video dimensions in line {line}')
1322      width, height = int(match.group(1)), int(match.group(2))
1323      if match := re.search(r', ([\d.]+) fps', line):
1324        fps = float(match.group(1))
1325      elif str(path).endswith('.gif'):
1326        # Some GIF files lack a framerate attribute; use a reasonable default.
1327        fps = 10
1328      else:
1329        raise RuntimeError(f'Unable to parse video framerate in line {line}')
1330    if match := re.fullmatch(r'\s*rotate\s*:\s*(\d+)', line):
1331      rotation = int(match.group(1))
1332    if match := re.fullmatch(r'.*rotation of (-?\d+).*\sdegrees', line):
1333      rotation = int(match.group(1))
1334  if not num_images:
1335    num_images = -1
1336  if not width:
1337    raise RuntimeError(f'Unable to parse video header: {err}')
1338  # By default, ffmpeg enables "-autorotate"; we just fix the dimensions.
1339  if rotation in (90, 270, -90, -270):
1340    width, height = height, width
1341  assert height is not None and width is not None
1342  shape = height, width
1343  assert fps is not None
1344  return VideoMetadata(num_images, shape, fps, bps)
1345
1346
1347class _VideoIO:
1348  """Base class for `VideoReader` and `VideoWriter`."""
1349
1350  def _get_pix_fmt(self, dtype: _DType, image_format: str) -> str:
1351    """Returns ffmpeg pix_fmt given data type and image format."""
1352    native_endian_suffix = {'little': 'le', 'big': 'be'}[sys.byteorder]
1353    return {
1354        np.uint8: {
1355            'rgb': 'rgb24',
1356            'yuv': 'yuv444p',
1357            'gray': 'gray',
1358        },
1359        np.uint16: {
1360            'rgb': 'rgb48' + native_endian_suffix,
1361            'yuv': 'yuv444p16' + native_endian_suffix,
1362            'gray': 'gray16' + native_endian_suffix,
1363        },
1364    }[dtype.type][image_format]
1365
1366
1367class VideoReader(_VideoIO):
1368  """Context to read a compressed video as an iterable over its images.
1369
1370  >>> with VideoReader('/tmp/river.mp4') as reader:
1371  ...   print(f'Video has {reader.num_images} images with shape={reader.shape},'
1372  ...         f' at {reader.fps} frames/sec and {reader.bps} bits/sec.')
1373  ...   for image in reader:
1374  ...     print(image.shape)
1375
1376  >>> with VideoReader('/tmp/river.mp4') as reader:
1377  ...   video = np.array(tuple(reader))
1378
1379  >>> url = 'https://github.com/hhoppe/data/raw/main/video.mp4'
1380  >>> with VideoReader(url) as reader:
1381  ...   show_video(reader)
1382
1383  Attributes:
1384    path_or_url: Location of input video.
1385    output_format: Format of output images (default 'rgb').  If 'rgb', each
1386      image has shape=(height, width, 3) with R, G, B values.  If 'yuv', each
1387      image has shape=(height, width, 3) with Y, U, V values.  If 'gray', each
1388      image has shape=(height, width).
1389    dtype: Data type for output images.  The default is `np.uint8`.  Use of
1390      `np.uint16` allows reading 10-bit or 12-bit data without precision loss.
1391    metadata: Object storing the information retrieved from the video header.
1392      Its attributes are copied as attributes in this class.
1393    num_images: Number of frames that is expected from the video stream.  This
1394      is estimated from the framerate and the duration stored in the video
1395      header, so it might be inexact.
1396    shape: The dimensions (height, width) of each video frame.
1397    fps: The framerate in frames per second.
1398    bps: The estimated bitrate of the video stream in bits per second, retrieved
1399      from the video header.
1400    stream_index: The stream index to read from. The default is 0.
1401  """
1402
1403  path_or_url: _Path
1404  output_format: str
1405  dtype: _DType
1406  metadata: VideoMetadata
1407  num_images: int
1408  shape: tuple[int, int]
1409  fps: float
1410  bps: int | None
1411  stream_index: int
1412  _num_bytes_per_image: int
1413
1414  def __init__(
1415      self,
1416      path_or_url: _Path,
1417      *,
1418      stream_index: int = 0,
1419      output_format: str = 'rgb',
1420      dtype: _DTypeLike = np.uint8,
1421  ):
1422    if output_format not in {'rgb', 'yuv', 'gray'}:
1423      raise ValueError(
1424          f'Output format {output_format} is not rgb, yuv, or gray.'
1425      )
1426    self.path_or_url = path_or_url
1427    self.output_format = output_format
1428    self.stream_index = stream_index
1429    self.dtype = np.dtype(dtype)
1430    if self.dtype.type not in (np.uint8, np.uint16):
1431      raise ValueError(f'Type {dtype} is not np.uint8 or np.uint16.')
1432    self._read_via_local_file: Any = None
1433    self._popen: subprocess.Popen[bytes] | None = None
1434    self._proc: subprocess.Popen[bytes] | None = None
1435
1436  def __enter__(self) -> 'VideoReader':
1437    try:
1438      self._read_via_local_file = _read_via_local_file(self.path_or_url)
1439      # pylint: disable-next=no-member
1440      tmp_name = self._read_via_local_file.__enter__()
1441
1442      self.metadata = _get_video_metadata(tmp_name)
1443      self.num_images, self.shape, self.fps, self.bps = self.metadata
1444      pix_fmt = self._get_pix_fmt(self.dtype, self.output_format)
1445      num_channels = {'rgb': 3, 'yuv': 3, 'gray': 1}[self.output_format]
1446      bytes_per_channel = self.dtype.itemsize
1447      self._num_bytes_per_image = (
1448          math.prod(self.shape) * num_channels * bytes_per_channel
1449      )
1450
1451      command = [
1452          '-v',
1453          'panic',
1454          '-nostdin',
1455          '-i',
1456          tmp_name,
1457          '-vcodec',
1458          'rawvideo',
1459          '-f',
1460          'image2pipe',
1461          '-map',
1462          f'0:v:{self.stream_index}',
1463          '-pix_fmt',
1464          pix_fmt,
1465          '-vsync',
1466          'vfr',
1467          '-',
1468      ]
1469      self._popen = _run_ffmpeg(
1470          command,
1471          stdout=subprocess.PIPE,
1472          stderr=subprocess.PIPE,
1473          allowed_input_files=[tmp_name],
1474      )
1475      self._proc = self._popen.__enter__()
1476    except Exception:
1477      self.__exit__(None, None, None)
1478      raise
1479    return self
1480
1481  def __exit__(self, *_: Any) -> None:
1482    self.close()
1483
1484  def read(self) -> _NDArray | None:
1485    """Reads a video image frame (or None if at end of file).
1486
1487    Returns:
1488      A numpy array in the format specified by `output_format`, i.e., a 3D
1489      array with 3 color channels, except for format 'gray' which is 2D.
1490    """
1491    assert self._proc, 'Error: reading from an already closed context.'
1492    stdout = self._proc.stdout
1493    assert stdout is not None
1494    data = stdout.read(self._num_bytes_per_image)
1495    if not data:  # Due to either end-of-file or subprocess error.
1496      self.close()  # Raises exception if subprocess had error.
1497      return None  # To indicate end-of-file.
1498    assert len(data) == self._num_bytes_per_image
1499    image = np.frombuffer(data, dtype=self.dtype)
1500    if self.output_format == 'rgb':
1501      image = image.reshape(*self.shape, 3)
1502    elif self.output_format == 'yuv':  # Convert from planar YUV to pixel YUV.
1503      image = np.moveaxis(image.reshape(3, *self.shape), 0, 2)
1504    elif self.output_format == 'gray':  # Generate 2D rather than 3D ndimage.
1505      image = image.reshape(*self.shape)
1506    else:
1507      raise AssertionError
1508    return image
1509
1510  def __iter__(self) -> Iterator[_NDArray]:
1511    while True:
1512      image = self.read()
1513      if image is None:
1514        return
1515      yield image
1516
1517  def close(self) -> None:
1518    """Terminates video reader.  (Called automatically at end of context.)"""
1519    if self._popen:
1520      self._popen.__exit__(None, None, None)
1521      self._popen = None
1522      self._proc = None
1523    if self._read_via_local_file:
1524      # pylint: disable-next=no-member
1525      self._read_via_local_file.__exit__(None, None, None)
1526      self._read_via_local_file = None
1527
1528
1529class VideoWriter(_VideoIO):
1530  """Context to write a compressed video.
1531
1532  >>> shape = 480, 640
1533  >>> with VideoWriter('/tmp/v.mp4', shape, fps=60) as writer:
1534  ...   for image in moving_circle(shape, num_images=60):
1535  ...     writer.add_image(image)
1536  >>> show_video(read_video('/tmp/v.mp4'))
1537
1538
1539  Bitrate control may be specified using at most one of: `bps`, `qp`, or `crf`.
1540  If none are specified, `qp` is set to a default value.
1541  See https://slhck.info/video/2017/03/01/rate-control.html
1542
1543  If codec is 'gif', the args `bps`, `qp`, `crf`, and `encoded_format` are
1544  ignored.
1545
1546  Attributes:
1547    path: Output video.  Its suffix (e.g. '.mp4') determines the video container
1548      format.  The suffix must be '.gif' if the codec is 'gif'.
1549    shape: 2D spatial dimensions (height, width) of video image frames.  The
1550      dimensions must be even if 'encoded_format' has subsampled chroma (e.g.,
1551      'yuv420p' or 'yuv420p10le').
1552    codec: Compression algorithm as defined by "ffmpeg -codecs" (e.g., 'h264',
1553      'hevc', 'vp9', or 'gif').
1554    metadata: Optional VideoMetadata object whose `fps` and `bps` attributes are
1555      used if not specified as explicit parameters.
1556    fps: Frames-per-second framerate (default is 60.0 except 25.0 for 'gif').
1557    bps: Requested average bits-per-second bitrate (default None).
1558    qp: Quantization parameter for video compression quality (default None).
1559    crf: Constant rate factor for video compression quality (default None).
1560    ffmpeg_args: Additional arguments for `ffmpeg` command, e.g. '-g 30' to
1561      introduce I-frames, or '-bf 0' to omit B-frames.
1562    input_format: Format of input images (default 'rgb').  If 'rgb', each image
1563      has shape=(height, width, 3) or (height, width).  If 'yuv', each image has
1564      shape=(height, width, 3) with Y, U, V values.  If 'gray', each image has
1565      shape=(height, width).
1566    dtype: Expected data type for input images (any float input images are
1567      converted to `dtype`).  The default is `np.uint8`.  Use of `np.uint16` is
1568      necessary when encoding >8 bits/channel.
1569    encoded_format: Pixel format as defined by `ffmpeg -pix_fmts`, e.g.,
1570      'yuv420p' (2x2-subsampled chroma), 'yuv444p' (full-res chroma),
1571      'yuv420p10le' (10-bit per channel), etc.  The default (None) selects
1572      'yuv420p' if all shape dimensions are even, else 'yuv444p'.
1573  """
1574
1575  def __init__(
1576      self,
1577      path: _Path,
1578      shape: tuple[int, int],
1579      *,
1580      codec: str = 'h264',
1581      metadata: VideoMetadata | None = None,
1582      fps: float | None = None,
1583      bps: int | None = None,
1584      qp: int | None = None,
1585      crf: float | None = None,
1586      ffmpeg_args: str | Sequence[str] = '',
1587      input_format: str = 'rgb',
1588      dtype: _DTypeLike = np.uint8,
1589      encoded_format: str | None = None,
1590  ) -> None:
1591    _check_2d_shape(shape)
1592    if fps is None and metadata:
1593      fps = metadata.fps
1594    if fps is None:
1595      fps = 25.0 if codec == 'gif' else 60.0
1596    if fps <= 0.0:
1597      raise ValueError(f'Frame-per-second value {fps} is invalid.')
1598    if bps is None and metadata:
1599      bps = metadata.bps
1600    bps = int(bps) if bps is not None else None
1601    if bps is not None and bps <= 0:
1602      raise ValueError(f'Bitrate value {bps} is invalid.')
1603    if qp is not None and (not isinstance(qp, int) or qp <= 0):
1604      raise ValueError(
1605          f'Quantization parameter {qp} is not a positive integer.'
1606      )
1607    num_rate_specifications = sum(x is not None for x in (bps, qp, crf))
1608    if num_rate_specifications > 1:
1609      raise ValueError(
1610          f'Must specify at most one of bps, qp, or crf ({bps}, {qp}, {crf}).'
1611      )
1612    ffmpeg_args = (
1613        shlex.split(ffmpeg_args)
1614        if isinstance(ffmpeg_args, str)
1615        else list(ffmpeg_args)
1616    )
1617    if input_format not in {'rgb', 'yuv', 'gray'}:
1618      raise ValueError(f'Input format {input_format} is not rgb, yuv, or gray.')
1619    dtype = np.dtype(dtype)
1620    if dtype.type not in (np.uint8, np.uint16):
1621      raise ValueError(f'Type {dtype} is not np.uint8 or np.uint16.')
1622    self.path = pathlib.Path(path)
1623    self.shape = shape
1624    all_dimensions_are_even = all(dim % 2 == 0 for dim in shape)
1625    if encoded_format is None:
1626      encoded_format = 'yuv420p' if all_dimensions_are_even else 'yuv444p'
1627    if not all_dimensions_are_even and encoded_format.startswith(
1628        ('yuv42', 'yuvj42')
1629    ):
1630      raise ValueError(
1631          f'With encoded_format {encoded_format}, video dimensions must be'
1632          f' even, but shape is {shape}.'
1633      )
1634    self.fps = fps
1635    self.codec = codec
1636    self.bps = bps
1637    self.qp = qp
1638    self.crf = crf
1639    self.ffmpeg_args = ffmpeg_args
1640    self.input_format = input_format
1641    self.dtype = dtype
1642    self.encoded_format = encoded_format
1643    if num_rate_specifications == 0 and not ffmpeg_args:
1644      qp = 20 if math.prod(self.shape) <= 640 * 480 else 28
1645    self._bitrate_args = (
1646        (['-vb', f'{bps}'] if bps is not None else [])
1647        + (['-qp', f'{qp}'] if qp is not None else [])
1648        + (['-vb', '0', '-crf', f'{crf}'] if crf is not None else [])
1649    )
1650    if self.codec == 'gif':
1651      if self.path.suffix != '.gif':
1652        raise ValueError(f"File '{self.path}' does not have a .gif suffix.")
1653      self.encoded_format = 'pal8'
1654      self._bitrate_args = []
1655      video_filter = 'split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse'
1656      # Less common (and likely less useful) is a per-frame color palette:
1657      # video_filter = ('split[s0][s1];[s0]palettegen=stats_mode=single[p];'
1658      #                 '[s1][p]paletteuse=new=1')
1659      self.ffmpeg_args = ['-vf', video_filter, '-f', 'gif'] + self.ffmpeg_args
1660    self._write_via_local_file: Any = None
1661    self._popen: subprocess.Popen[bytes] | None = None
1662    self._proc: subprocess.Popen[bytes] | None = None
1663
1664  def __enter__(self) -> 'VideoWriter':
1665    input_pix_fmt = self._get_pix_fmt(self.dtype, self.input_format)
1666    try:
1667      self._write_via_local_file = _write_via_local_file(self.path)
1668      # pylint: disable-next=no-member
1669      tmp_name = self._write_via_local_file.__enter__()
1670
1671      # Writing to stdout using ('-f', 'mp4', '-') would require
1672      # ('-movflags', 'frag_keyframe+empty_moov') and the result is nonportable.
1673      height, width = self.shape
1674      command = (
1675          [
1676              '-v',
1677              'error',
1678              '-f',
1679              'rawvideo',
1680              '-vcodec',
1681              'rawvideo',
1682              '-pix_fmt',
1683              input_pix_fmt,
1684              '-s',
1685              f'{width}x{height}',
1686              '-r',
1687              f'{self.fps}',
1688              '-i',
1689              '-',
1690              '-an',
1691              '-vcodec',
1692              self.codec,
1693              '-pix_fmt',
1694              self.encoded_format,
1695          ]
1696          + self._bitrate_args
1697          + self.ffmpeg_args
1698          + ['-y', tmp_name]
1699      )
1700      self._popen = _run_ffmpeg(
1701          command,
1702          stdin=subprocess.PIPE,
1703          stderr=subprocess.PIPE,
1704          allowed_output_files=[tmp_name],
1705      )
1706      self._proc = self._popen.__enter__()
1707    except Exception:
1708      self.__exit__(None, None, None)
1709      raise
1710    return self
1711
1712  def __exit__(self, *_: Any) -> None:
1713    self.close()
1714
1715  def add_image(self, image: _NDArray) -> None:
1716    """Writes a video frame.
1717
1718    Args:
1719      image: Array whose dtype and first two dimensions must match the `dtype`
1720        and `shape` specified in `VideoWriter` initialization.  If
1721        `input_format` is 'gray', the image must be 2D.  For the 'rgb'
1722        input_format, the image may be either 2D (interpreted as grayscale) or
1723        3D with three (R, G, B) channels.  For the 'yuv' input_format, the image
1724        must be 3D with three (Y, U, V) channels.
1725
1726    Raises:
1727      RuntimeError: If there is an error writing to the output file.
1728    """
1729    assert self._proc, 'Error: writing to an already closed context.'
1730    if issubclass(image.dtype.type, (np.floating, np.bool_)):
1731      image = to_type(image, self.dtype)
1732    if image.dtype != self.dtype:
1733      raise ValueError(f'Image type {image.dtype} != {self.dtype}.')
1734    if self.input_format == 'gray':
1735      if image.ndim != 2:
1736        raise ValueError(f'Image dimensions {image.shape} are not 2D.')
1737    else:
1738      if image.ndim == 2 and self.input_format == 'rgb':
1739        image = np.dstack((image, image, image))
1740      if not (image.ndim == 3 and image.shape[2] == 3):
1741        raise ValueError(f'Image dimensions {image.shape} are invalid.')
1742    if image.shape[:2] != self.shape:
1743      raise ValueError(
1744          f'Image dimensions {image.shape[:2]} do not match'
1745          f' those of the initialized video {self.shape}.'
1746      )
1747    if self.input_format == 'yuv':  # Convert from per-pixel YUV to planar YUV.
1748      image = np.moveaxis(image, 2, 0)
1749    data = image.tobytes()
1750    stdin = self._proc.stdin
1751    assert stdin is not None
1752    if stdin.write(data) != len(data):
1753      self._proc.wait()
1754      stderr = self._proc.stderr
1755      assert stderr is not None
1756      s = stderr.read().decode('utf-8')
1757      raise RuntimeError(f"Error writing '{self.path}': {s}")
1758
1759  def close(self) -> None:
1760    """Finishes writing the video.  (Called automatically at end of context.)"""
1761    if self._popen:
1762      assert self._proc, 'Error: closing an already closed context.'
1763      stdin = self._proc.stdin
1764      assert stdin is not None
1765      stdin.close()
1766      if self._proc.wait():
1767        stderr = self._proc.stderr
1768        assert stderr is not None
1769        s = stderr.read().decode('utf-8')
1770        raise RuntimeError(f"Error writing '{self.path}': {s}")
1771      self._popen.__exit__(None, None, None)
1772      self._popen = None
1773      self._proc = None
1774    if self._write_via_local_file:
1775      # pylint: disable-next=no-member
1776      self._write_via_local_file.__exit__(None, None, None)
1777      self._write_via_local_file = None
1778
1779
1780class _VideoArray(npt.NDArray[Any]):
1781  """Wrapper to add a VideoMetadata `metadata` attribute to a numpy array."""
1782
1783  metadata: VideoMetadata | None
1784
1785  def __new__(
1786      cls: Type['_VideoArray'],
1787      input_array: _NDArray,
1788      metadata: VideoMetadata | None = None,
1789  ) -> '_VideoArray':
1790    obj: _VideoArray = np.asarray(input_array).view(cls)
1791    obj.metadata = metadata
1792    return obj
1793
1794  def __array_finalize__(self, obj: Any) -> None:
1795    if obj is None:
1796      return
1797    self.metadata = getattr(obj, 'metadata', None)
1798
1799
1800def read_video(path_or_url: _Path, **kwargs: Any) -> _VideoArray:
1801  """Returns an array containing all images read from a compressed video file.
1802
1803  >>> video = read_video('/tmp/river.mp4')
1804  >>> print(f'The framerate is {video.metadata.fps} frames/s.')
1805  >>> show_video(video)
1806
1807  >>> url = 'https://github.com/hhoppe/data/raw/main/video.mp4'
1808  >>> show_video(read_video(url))
1809
1810  Args:
1811    path_or_url: Input video file.
1812    **kwargs: Additional parameters for `VideoReader`.
1813
1814  Returns:
1815    A 4D `numpy` array with dimensions (frame, height, width, channel), or a 3D
1816    array if `output_format` is specified as 'gray'.  The returned array has an
1817    attribute `metadata` containing `VideoMetadata` information.  This enables
1818    `show_video` to retrieve the framerate in `metadata.fps`.  Note that the
1819    metadata attribute is lost in most subsequent `numpy` operations.
1820  """
1821  with VideoReader(path_or_url, **kwargs) as reader:
1822    return _VideoArray(np.array(tuple(reader)), metadata=reader.metadata)
1823
1824
1825def write_video(path: _Path, images: Iterable[_NDArray], **kwargs: Any) -> None:
1826  """Writes images to a compressed video file.
1827
1828  >>> video = moving_circle((480, 640), num_images=60)
1829  >>> write_video('/tmp/v.mp4', video, fps=60, qp=18)
1830  >>> show_video(read_video('/tmp/v.mp4'))
1831
1832  Args:
1833    path: Output video file.
1834    images: Iterable over video frames, e.g. a 4D array or a list of 2D or 3D
1835      arrays.
1836    **kwargs: Additional parameters for `VideoWriter`.
1837  """
1838  first_image, images = _peek_first(images)
1839  shape: tuple[int, int] = first_image.shape[:2]
1840  dtype = first_image.dtype
1841  if dtype == bool:
1842    dtype = np.dtype(np.uint8)
1843  elif np.issubdtype(dtype, np.floating):
1844    dtype = np.dtype(np.uint16)
1845  kwargs = {'metadata': getattr(images, 'metadata', None), **kwargs}
1846  with VideoWriter(path, shape=shape, dtype=dtype, **kwargs) as writer:
1847    for image in images:
1848      writer.add_image(image)
1849
1850
1851def compress_video(
1852    images: Iterable[_NDArray], *, codec: str = 'h264', **kwargs: Any
1853) -> bytes:
1854  """Returns a buffer containing a compressed video.
1855
1856  The video container is 'gif' for 'gif' codec, 'webm' for 'vp9' codec,
1857  and mp4 otherwise.
1858
1859  >>> video = read_video('/tmp/river.mp4')
1860  >>> data = compress_video(video, bps=10_000_000)
1861  >>> print(len(data))
1862
1863  >>> data = compress_video(moving_circle((100, 100), num_images=10), fps=10)
1864
1865  Args:
1866    images: Iterable over video frames.
1867    codec: Compression algorithm as defined by `ffmpeg -codecs` (e.g., 'h264',
1868      'hevc', 'vp9', or 'gif').
1869    **kwargs: Additional parameters for `VideoWriter`.
1870
1871  Returns:
1872    A bytes buffer containing the compressed video.
1873  """
1874  suffix = _filename_suffix_from_codec(codec)
1875  with tempfile.TemporaryDirectory() as directory_name:
1876    tmp_path = pathlib.Path(directory_name) / f'file{suffix}'
1877    write_video(tmp_path, images, codec=codec, **kwargs)
1878    return tmp_path.read_bytes()
1879
1880
1881def decompress_video(data: bytes, **kwargs: Any) -> _NDArray:
1882  """Returns video images from an MP4-compressed data buffer."""
1883  with tempfile.TemporaryDirectory() as directory_name:
1884    tmp_path = pathlib.Path(directory_name) / 'file.mp4'
1885    tmp_path.write_bytes(data)
1886    return read_video(tmp_path, **kwargs)
1887
1888
1889def html_from_compressed_video(
1890    data: bytes,
1891    width: int,
1892    height: int,
1893    *,
1894    title: str | None = None,
1895    border: bool | str = False,
1896    loop: bool = True,
1897    autoplay: bool = True,
1898) -> str:
1899  """Returns an HTML string with a video tag containing H264-encoded data.
1900
1901  Args:
1902    data: MP4-compressed video bytes.
1903    width: Width of HTML video in pixels.
1904    height: Height of HTML video in pixels.
1905    title: Optional text shown centered above the video.
1906    border: If `bool`, whether to place a black boundary around the image, or if
1907      `str`, the boundary CSS style.
1908    loop: If True, the playback repeats forever.
1909    autoplay: If True, video playback starts without having to click.
1910  """
1911  b64 = base64.b64encode(data).decode('utf-8')
1912  if isinstance(border, str):
1913    border = f'{border}; '
1914  elif border:
1915    border = 'border:1px solid black; '
1916  else:
1917    border = ''
1918  options = (
1919      f'controls width="{width}" height="{height}"'
1920      f' style="{border}object-fit:cover;"'
1921      f'{" loop" if loop else ""}'
1922      f'{" autoplay muted" if autoplay else ""}'
1923  )
1924  s = f"""<video {options}>
1925      <source src="data:video/mp4;base64,{b64}" type="video/mp4"/>
1926      This browser does not support the video tag.
1927      </video>"""
1928  if title is not None:
1929    s = f"""<div style="display:flex; align-items:left;">
1930      <div style="display:flex; flex-direction:column; align-items:center;">
1931      <div>{title}</div><div>{s}</div></div></div>"""
1932  return s
1933
1934
1935def show_video(
1936    images: Iterable[_NDArray], *, title: str | None = None, **kwargs: Any
1937) -> str | None:
1938  """Displays a video in the IPython notebook and optionally saves it to a file.
1939
1940  See `show_videos`.
1941
1942  >>> video = read_video('https://github.com/hhoppe/data/raw/main/video.mp4')
1943  >>> show_video(video, title='River video')
1944
1945  >>> show_video(moving_circle((80, 80), num_images=10), fps=5, border=True)
1946
1947  >>> show_video(read_video('/tmp/river.mp4'))
1948
1949  Args:
1950    images: Iterable of video frames (e.g., a 4D array or a list of 2D or 3D
1951      arrays).
1952    title: Optional text shown centered above the video.
1953    **kwargs: See `show_videos`.
1954
1955  Returns:
1956    html string if `return_html` is `True`.
1957  """
1958  return show_videos([images], [title], **kwargs)
1959
1960
1961def show_videos(
1962    videos: Iterable[Iterable[_NDArray]] | Mapping[str, Iterable[_NDArray]],
1963    titles: Iterable[str | None] | None = None,
1964    *,
1965    width: int | None = None,
1966    height: int | None = None,
1967    downsample: bool = True,
1968    columns: int | None = None,
1969    fps: float | None = None,
1970    bps: int | None = None,
1971    qp: int | None = None,
1972    codec: str = 'h264',
1973    ylabel: str = '',
1974    html_class: str = 'show_videos',
1975    return_html: bool = False,
1976    **kwargs: Any,
1977) -> str | None:
1978  """Displays a row of videos in the IPython notebook.
1979
1980  Creates HTML with `<video>` tags containing embedded H264-encoded bytestrings.
1981  If `codec` is set to 'gif', we instead use `<img>` tags containing embedded
1982  GIF-encoded bytestrings.  Note that the resulting GIF animations skip frames
1983  when the `fps` period is not a multiple of 10 ms units (GIF frame delay
1984  units).  Encoding at `fps` = 20.0, 25.0, or 50.0 works fine.
1985
1986  If a directory has been specified using `set_show_save_dir`, also saves each
1987  titled video to a file in that directory based on its title.
1988
1989  Args:
1990    videos: Iterable of videos, or dictionary of `{title: video}`.  Each video
1991      must be an iterable of images.  If a video object has a `metadata`
1992      (`VideoMetadata`) attribute, its `fps` field provides a default framerate.
1993    titles: Optional strings shown above the corresponding videos.
1994    width: Optional, overrides displayed width (in pixels).
1995    height: Optional, overrides displayed height (in pixels).
1996    downsample: If True, each video whose width or height is greater than the
1997      specified `width` or `height` is resampled to the display resolution. This
1998      improves antialiasing and reduces the size of the notebook.
1999    columns: Optional, maximum number of videos per row.
2000    fps: Frames-per-second framerate (default is 60.0 except 25.0 for GIF).
2001    bps: Bits-per-second bitrate (default None).
2002    qp: Quantization parameter for video compression quality (default None).
2003    codec: Compression algorithm; must be either 'h264' or 'gif'.
2004    ylabel: Text (rotated by 90 degrees) shown on the left of each row.
2005    html_class: CSS class name used in definition of HTML element.
2006    return_html: If `True` return the raw HTML `str` instead of displaying.
2007    **kwargs: Additional parameters (`border`, `loop`, `autoplay`) for
2008      `html_from_compressed_video`.
2009
2010  Returns:
2011    html string if `return_html` is `True`.
2012  """
2013  if isinstance(videos, Mapping):
2014    if titles is not None:
2015      raise ValueError(
2016          'Cannot have both a video dictionary and a titles parameter.'
2017      )
2018    list_titles = list(videos.keys())
2019    list_videos = list(videos.values())
2020  else:
2021    list_videos = list(cast('Iterable[_NDArray]', videos))
2022    list_titles = [None] * len(list_videos) if titles is None else list(titles)
2023    if len(list_videos) != len(list_titles):
2024      raise ValueError(
2025          'Number of videos does not match number of titles'
2026          f' ({len(list_videos)} vs {len(list_titles)}).'
2027      )
2028  if codec not in {'h264', 'gif'}:
2029    raise ValueError(f'Codec {codec} is neither h264 or gif.')
2030
2031  html_strings = []
2032  for video, title in zip(list_videos, list_titles):
2033    metadata: VideoMetadata | None = getattr(video, 'metadata', None)
2034    first_image, video = _peek_first(video)
2035    w, h = _get_width_height(width, height, first_image.shape[:2])
2036    if downsample and (w < first_image.shape[1] or h < first_image.shape[0]):
2037      # Not resize_video() because each image may have different depth and type.
2038      video = [resize_image(image, (h, w)) for image in video]
2039      first_image = video[0]
2040    data = compress_video(
2041        video, metadata=metadata, fps=fps, bps=bps, qp=qp, codec=codec
2042    )
2043    if title is not None and _config.show_save_dir:
2044      suffix = _filename_suffix_from_codec(codec)
2045      path = pathlib.Path(_config.show_save_dir) / f'{title}{suffix}'
2046      with _open(path, mode='wb') as f:
2047        f.write(data)
2048    if codec == 'gif':
2049      pixelated = h > first_image.shape[0] or w > first_image.shape[1]
2050      html_string = html_from_compressed_image(
2051          data, w, h, title=title, fmt='gif', pixelated=pixelated, **kwargs
2052      )
2053    else:
2054      html_string = html_from_compressed_video(
2055          data, w, h, title=title, **kwargs
2056      )
2057    html_strings.append(html_string)
2058
2059  # Create single-row tables each with no more than 'columns' elements.
2060  table_strings = []
2061  for row_html_strings in _chunked(html_strings, columns):
2062    td = '<td style="padding:1px;">'
2063    s = ''.join(f'{td}{e}</td>' for e in row_html_strings)
2064    if ylabel:
2065      style = 'writing-mode:vertical-lr; transform:rotate(180deg);'
2066      s = f'{td}<span style="{style}">{ylabel}</span></td>' + s
2067    table_strings.append(
2068        f'<table class="{html_class}"'
2069        f' style="border-spacing:0px;"><tr>{s}</tr></table>'
2070    )
2071  s = ''.join(table_strings)
2072  if return_html:
2073    return s
2074  _display_html(s)
2075  return None
2076
2077
2078# Local Variables:
2079# fill-column: 80
2080# End:
def show_image( image: ArrayLike, *, title: str | None = None, **kwargs: Any) -> str | None:
975def show_image(
976    image: _ArrayLike, *, title: str | None = None, **kwargs: Any
977) -> str | None:
978  """Displays an image in the notebook and optionally saves it to a file.
979
980  See `show_images`.
981
982  >>> show_image(np.random.rand(100, 100))
983  >>> show_image(np.random.randint(0, 256, size=(80, 80, 3), dtype='uint8'))
984  >>> show_image(np.random.rand(10, 10) - 0.5, cmap='bwr', height=100)
985  >>> show_image(read_image('/tmp/image.png'))
986  >>> url = 'https://github.com/hhoppe/data/raw/main/image.png'
987  >>> show_image(read_image(url))
988
989  Args:
990    image: 2D array-like, or 3D array-like with 1, 3, or 4 channels.
991    title: Optional text shown centered above the image.
992    **kwargs: See `show_images`.
993
994  Returns:
995    html string if `return_html` is `True`.
996  """
997  return show_images([np.asarray(image)], [title], **kwargs)

Displays an image in the notebook and optionally saves it to a file.

See show_images.

>>> show_image(np.random.rand(100, 100))
>>> show_image(np.random.randint(0, 256, size=(80, 80, 3), dtype='uint8'))
>>> show_image(np.random.rand(10, 10) - 0.5, cmap='bwr', height=100)
>>> show_image(read_image('/tmp/image.png'))
>>> url = 'https://github.com/hhoppe/data/raw/main/image.png'
>>> show_image(read_image(url))
Arguments:
  • image: 2D array-like, or 3D array-like with 1, 3, or 4 channels.
  • title: Optional text shown centered above the image.
  • **kwargs: See show_images.
Returns:

html string if return_html is True.

def show_images( images: Iterable[ArrayLike] | Mapping[str, ArrayLike], titles: Iterable[str | None] | None = None, *, width: int | None = None, height: int | None = None, downsample: bool = True, columns: int | None = None, vmin: float | None = None, vmax: float | None = None, cmap: str | Callable[[ArrayLike], np.ndarray] = 'gray', border: bool | str = False, ylabel: str = '', html_class: str = 'show_images', pixelated: bool | None = None, return_html: bool = False) -> str | None:
1000def show_images(
1001    images: Iterable[_ArrayLike] | Mapping[str, _ArrayLike],
1002    titles: Iterable[str | None] | None = None,
1003    *,
1004    width: int | None = None,
1005    height: int | None = None,
1006    downsample: bool = True,
1007    columns: int | None = None,
1008    vmin: float | None = None,
1009    vmax: float | None = None,
1010    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
1011    border: bool | str = False,
1012    ylabel: str = '',
1013    html_class: str = 'show_images',
1014    pixelated: bool | None = None,
1015    return_html: bool = False,
1016) -> str | None:
1017  """Displays a row of images in the IPython/Jupyter notebook.
1018
1019  If a directory has been specified using `set_show_save_dir`, also saves each
1020  titled image to a file in that directory based on its title.
1021
1022  >>> image1, image2 = np.random.rand(64, 64, 3), color_ramp((64, 64))
1023  >>> show_images([image1, image2])
1024  >>> show_images({'random image': image1, 'color ramp': image2}, height=128)
1025  >>> show_images([image1, image2] * 5, columns=4, border=True)
1026
1027  Args:
1028    images: Iterable of images, or dictionary of `{title: image}`.  Each image
1029      must be either a 2D array or a 3D array with 1, 3, or 4 channels.
1030    titles: Optional strings shown above the corresponding images.
1031    width: Optional, overrides displayed width (in pixels).
1032    height: Optional, overrides displayed height (in pixels).
1033    downsample: If True, each image whose width or height is greater than the
1034      specified `width` or `height` is resampled to the display resolution. This
1035      improves antialiasing and reduces the size of the notebook.
1036    columns: Optional, maximum number of images per row.
1037    vmin: For single-channel image, explicit min value for display.
1038    vmax: For single-channel image, explicit max value for display.
1039    cmap: For single-channel image, `pyplot` color map or callable to map 1D to
1040      3D color.
1041    border: If `bool`, whether to place a black boundary around the image, or if
1042      `str`, the boundary CSS style.
1043    ylabel: Text (rotated by 90 degrees) shown on the left of each row.
1044    html_class: CSS class name used in definition of HTML element.
1045    pixelated: If True, sets the CSS style to 'image-rendering: pixelated;'; if
1046      False, sets 'image-rendering: auto'; if None, uses pixelated rendering
1047      only on images for which `width` or `height` introduces magnification.
1048    return_html: If `True` return the raw HTML `str` instead of displaying.
1049
1050  Returns:
1051    html string if `return_html` is `True`.
1052  """
1053  if isinstance(images, Mapping):
1054    if titles is not None:
1055      raise ValueError('Cannot have images dictionary and titles parameter.')
1056    list_titles, list_images = list(images.keys()), list(images.values())
1057  else:
1058    list_images = list(images)
1059    list_titles = [None] * len(list_images) if titles is None else list(titles)
1060    if len(list_images) != len(list_titles):
1061      raise ValueError(
1062          'Number of images does not match number of titles'
1063          f' ({len(list_images)} vs {len(list_titles)}).'
1064      )
1065
1066  list_images = [
1067      _ensure_mapped_to_rgb(image, vmin=vmin, vmax=vmax, cmap=cmap)
1068      for image in list_images
1069  ]
1070
1071  def maybe_downsample(image: _NDArray) -> _NDArray:
1072    shape: tuple[int, int] = image.shape[:2]
1073    w, h = _get_width_height(width, height, shape)
1074    if w < shape[1] or h < shape[0]:
1075      image = resize_image(image, (h, w))
1076    return image
1077
1078  if downsample:
1079    list_images = [maybe_downsample(image) for image in list_images]
1080  png_datas = [compress_image(to_uint8(image)) for image in list_images]
1081
1082  for title, png_data in zip(list_titles, png_datas):
1083    if title is not None and _config.show_save_dir:
1084      path = pathlib.Path(_config.show_save_dir) / f'{title}.png'
1085      with _open(path, mode='wb') as f:
1086        f.write(png_data)
1087
1088  def html_from_compressed_images() -> str:
1089    html_strings = []
1090    for image, title, png_data in zip(list_images, list_titles, png_datas):
1091      w, h = _get_width_height(width, height, image.shape[:2])
1092      magnified = h > image.shape[0] or w > image.shape[1]
1093      pixelated2 = pixelated if pixelated is not None else magnified
1094      html_strings.append(
1095          html_from_compressed_image(
1096              png_data, w, h, title=title, border=border, pixelated=pixelated2
1097          )
1098      )
1099    # Create single-row tables each with no more than 'columns' elements.
1100    table_strings = []
1101    for row_html_strings in _chunked(html_strings, columns):
1102      td = '<td style="padding:1px;">'
1103      s = ''.join(f'{td}{e}</td>' for e in row_html_strings)
1104      if ylabel:
1105        style = 'writing-mode:vertical-lr; transform:rotate(180deg);'
1106        s = f'{td}<span style="{style}">{ylabel}</span></td>' + s
1107      table_strings.append(
1108          f'<table class="{html_class}"'
1109          f' style="border-spacing:0px;"><tr>{s}</tr></table>'
1110      )
1111    return ''.join(table_strings)
1112
1113  s = html_from_compressed_images()
1114  while len(s) > _IPYTHON_HTML_SIZE_LIMIT * 0.5:
1115    list_images = [image[::2, ::2] for image in list_images]
1116    png_datas = [compress_image(to_uint8(image)) for image in list_images]
1117    s = html_from_compressed_images()
1118  if return_html:
1119    return s
1120  _display_html(s)
1121  return None

Displays a row of images in the IPython/Jupyter notebook.

If a directory has been specified using set_show_save_dir, also saves each titled image to a file in that directory based on its title.

>>> image1, image2 = np.random.rand(64, 64, 3), color_ramp((64, 64))
>>> show_images([image1, image2])
>>> show_images({'random image': image1, 'color ramp': image2}, height=128)
>>> show_images([image1, image2] * 5, columns=4, border=True)
Arguments:
  • images: Iterable of images, or dictionary of {title: image}. Each image must be either a 2D array or a 3D array with 1, 3, or 4 channels.
  • titles: Optional strings shown above the corresponding images.
  • width: Optional, overrides displayed width (in pixels).
  • height: Optional, overrides displayed height (in pixels).
  • downsample: If True, each image whose width or height is greater than the specified width or height is resampled to the display resolution. This improves antialiasing and reduces the size of the notebook.
  • columns: Optional, maximum number of images per row.
  • vmin: For single-channel image, explicit min value for display.
  • vmax: For single-channel image, explicit max value for display.
  • cmap: For single-channel image, pyplot color map or callable to map 1D to 3D color.
  • border: If bool, whether to place a black boundary around the image, or if str, the boundary CSS style.
  • ylabel: Text (rotated by 90 degrees) shown on the left of each row.
  • html_class: CSS class name used in definition of HTML element.
  • pixelated: If True, sets the CSS style to 'image-rendering: pixelated;'; if False, sets 'image-rendering: auto'; if None, uses pixelated rendering only on images for which width or height introduces magnification.
  • return_html: If True return the raw HTML str instead of displaying.
Returns:

html string if return_html is True.

def compare_images( images: Iterable[ArrayLike], *, vmin: float | None = None, vmax: float | None = None, cmap: str | Callable[[ArrayLike], np.ndarray] = 'gray') -> None:
1124def compare_images(
1125    images: Iterable[_ArrayLike],
1126    *,
1127    vmin: float | None = None,
1128    vmax: float | None = None,
1129    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
1130) -> None:
1131  """Compare two images using an interactive slider.
1132
1133  Displays an HTML slider component to interactively swipe between two images.
1134  The slider functionality requires that the web browser have Internet access.
1135  See additional info in `https://github.com/sneas/img-comparison-slider`.
1136
1137  >>> image1, image2 = np.random.rand(64, 64, 3), color_ramp((64, 64))
1138  >>> compare_images([image1, image2])
1139
1140  Args:
1141    images: Iterable of images.  Each image must be either a 2D array or a 3D
1142      array with 1, 3, or 4 channels.  There must be exactly two images.
1143    vmin: For single-channel image, explicit min value for display.
1144    vmax: For single-channel image, explicit max value for display.
1145    cmap: For single-channel image, `pyplot` color map or callable to map 1D to
1146      3D color.
1147  """
1148  list_images = [
1149      _ensure_mapped_to_rgb(image, vmin=vmin, vmax=vmax, cmap=cmap)
1150      for image in images
1151  ]
1152  if len(list_images) != 2:
1153    raise ValueError('The number of images must be 2.')
1154  png_datas = [compress_image(to_uint8(image)) for image in list_images]
1155  b64_1, b64_2 = [
1156      base64.b64encode(png_data).decode('utf-8') for png_data in png_datas
1157  ]
1158  s = _IMAGE_COMPARISON_HTML.replace('{b64_1}', b64_1).replace('{b64_2}', b64_2)
1159  _display_html(s)

Compare two images using an interactive slider.

Displays an HTML slider component to interactively swipe between two images. The slider functionality requires that the web browser have Internet access. See additional info in https://github.com/sneas/img-comparison-slider.

>>> image1, image2 = np.random.rand(64, 64, 3), color_ramp((64, 64))
>>> compare_images([image1, image2])
Arguments:
  • images: Iterable of images. Each image must be either a 2D array or a 3D array with 1, 3, or 4 channels. There must be exactly two images.
  • vmin: For single-channel image, explicit min value for display.
  • vmax: For single-channel image, explicit max value for display.
  • cmap: For single-channel image, pyplot color map or callable to map 1D to 3D color.
def show_video( images: Iterable[np.ndarray], *, title: str | None = None, **kwargs: Any) -> str | None:
1936def show_video(
1937    images: Iterable[_NDArray], *, title: str | None = None, **kwargs: Any
1938) -> str | None:
1939  """Displays a video in the IPython notebook and optionally saves it to a file.
1940
1941  See `show_videos`.
1942
1943  >>> video = read_video('https://github.com/hhoppe/data/raw/main/video.mp4')
1944  >>> show_video(video, title='River video')
1945
1946  >>> show_video(moving_circle((80, 80), num_images=10), fps=5, border=True)
1947
1948  >>> show_video(read_video('/tmp/river.mp4'))
1949
1950  Args:
1951    images: Iterable of video frames (e.g., a 4D array or a list of 2D or 3D
1952      arrays).
1953    title: Optional text shown centered above the video.
1954    **kwargs: See `show_videos`.
1955
1956  Returns:
1957    html string if `return_html` is `True`.
1958  """
1959  return show_videos([images], [title], **kwargs)

Displays a video in the IPython notebook and optionally saves it to a file.

See show_videos.

>>> video = read_video('https://github.com/hhoppe/data/raw/main/video.mp4')
>>> show_video(video, title='River video')
>>> show_video(moving_circle((80, 80), num_images=10), fps=5, border=True)
>>> show_video(read_video('/tmp/river.mp4'))
Arguments:
  • images: Iterable of video frames (e.g., a 4D array or a list of 2D or 3D arrays).
  • title: Optional text shown centered above the video.
  • **kwargs: See show_videos.
Returns:

html string if return_html is True.

def show_videos( videos: Iterable[Iterable[np.ndarray]] | Mapping[str, Iterable[np.ndarray]], titles: Iterable[str | None] | None = None, *, width: int | None = None, height: int | None = None, downsample: bool = True, columns: int | None = None, fps: float | None = None, bps: int | None = None, qp: int | None = None, codec: str = 'h264', ylabel: str = '', html_class: str = 'show_videos', return_html: bool = False, **kwargs: Any) -> str | None:
1962def show_videos(
1963    videos: Iterable[Iterable[_NDArray]] | Mapping[str, Iterable[_NDArray]],
1964    titles: Iterable[str | None] | None = None,
1965    *,
1966    width: int | None = None,
1967    height: int | None = None,
1968    downsample: bool = True,
1969    columns: int | None = None,
1970    fps: float | None = None,
1971    bps: int | None = None,
1972    qp: int | None = None,
1973    codec: str = 'h264',
1974    ylabel: str = '',
1975    html_class: str = 'show_videos',
1976    return_html: bool = False,
1977    **kwargs: Any,
1978) -> str | None:
1979  """Displays a row of videos in the IPython notebook.
1980
1981  Creates HTML with `<video>` tags containing embedded H264-encoded bytestrings.
1982  If `codec` is set to 'gif', we instead use `<img>` tags containing embedded
1983  GIF-encoded bytestrings.  Note that the resulting GIF animations skip frames
1984  when the `fps` period is not a multiple of 10 ms units (GIF frame delay
1985  units).  Encoding at `fps` = 20.0, 25.0, or 50.0 works fine.
1986
1987  If a directory has been specified using `set_show_save_dir`, also saves each
1988  titled video to a file in that directory based on its title.
1989
1990  Args:
1991    videos: Iterable of videos, or dictionary of `{title: video}`.  Each video
1992      must be an iterable of images.  If a video object has a `metadata`
1993      (`VideoMetadata`) attribute, its `fps` field provides a default framerate.
1994    titles: Optional strings shown above the corresponding videos.
1995    width: Optional, overrides displayed width (in pixels).
1996    height: Optional, overrides displayed height (in pixels).
1997    downsample: If True, each video whose width or height is greater than the
1998      specified `width` or `height` is resampled to the display resolution. This
1999      improves antialiasing and reduces the size of the notebook.
2000    columns: Optional, maximum number of videos per row.
2001    fps: Frames-per-second framerate (default is 60.0 except 25.0 for GIF).
2002    bps: Bits-per-second bitrate (default None).
2003    qp: Quantization parameter for video compression quality (default None).
2004    codec: Compression algorithm; must be either 'h264' or 'gif'.
2005    ylabel: Text (rotated by 90 degrees) shown on the left of each row.
2006    html_class: CSS class name used in definition of HTML element.
2007    return_html: If `True` return the raw HTML `str` instead of displaying.
2008    **kwargs: Additional parameters (`border`, `loop`, `autoplay`) for
2009      `html_from_compressed_video`.
2010
2011  Returns:
2012    html string if `return_html` is `True`.
2013  """
2014  if isinstance(videos, Mapping):
2015    if titles is not None:
2016      raise ValueError(
2017          'Cannot have both a video dictionary and a titles parameter.'
2018      )
2019    list_titles = list(videos.keys())
2020    list_videos = list(videos.values())
2021  else:
2022    list_videos = list(cast('Iterable[_NDArray]', videos))
2023    list_titles = [None] * len(list_videos) if titles is None else list(titles)
2024    if len(list_videos) != len(list_titles):
2025      raise ValueError(
2026          'Number of videos does not match number of titles'
2027          f' ({len(list_videos)} vs {len(list_titles)}).'
2028      )
2029  if codec not in {'h264', 'gif'}:
2030    raise ValueError(f'Codec {codec} is neither h264 or gif.')
2031
2032  html_strings = []
2033  for video, title in zip(list_videos, list_titles):
2034    metadata: VideoMetadata | None = getattr(video, 'metadata', None)
2035    first_image, video = _peek_first(video)
2036    w, h = _get_width_height(width, height, first_image.shape[:2])
2037    if downsample and (w < first_image.shape[1] or h < first_image.shape[0]):
2038      # Not resize_video() because each image may have different depth and type.
2039      video = [resize_image(image, (h, w)) for image in video]
2040      first_image = video[0]
2041    data = compress_video(
2042        video, metadata=metadata, fps=fps, bps=bps, qp=qp, codec=codec
2043    )
2044    if title is not None and _config.show_save_dir:
2045      suffix = _filename_suffix_from_codec(codec)
2046      path = pathlib.Path(_config.show_save_dir) / f'{title}{suffix}'
2047      with _open(path, mode='wb') as f:
2048        f.write(data)
2049    if codec == 'gif':
2050      pixelated = h > first_image.shape[0] or w > first_image.shape[1]
2051      html_string = html_from_compressed_image(
2052          data, w, h, title=title, fmt='gif', pixelated=pixelated, **kwargs
2053      )
2054    else:
2055      html_string = html_from_compressed_video(
2056          data, w, h, title=title, **kwargs
2057      )
2058    html_strings.append(html_string)
2059
2060  # Create single-row tables each with no more than 'columns' elements.
2061  table_strings = []
2062  for row_html_strings in _chunked(html_strings, columns):
2063    td = '<td style="padding:1px;">'
2064    s = ''.join(f'{td}{e}</td>' for e in row_html_strings)
2065    if ylabel:
2066      style = 'writing-mode:vertical-lr; transform:rotate(180deg);'
2067      s = f'{td}<span style="{style}">{ylabel}</span></td>' + s
2068    table_strings.append(
2069        f'<table class="{html_class}"'
2070        f' style="border-spacing:0px;"><tr>{s}</tr></table>'
2071    )
2072  s = ''.join(table_strings)
2073  if return_html:
2074    return s
2075  _display_html(s)
2076  return None

Displays a row of videos in the IPython notebook.

Creates HTML with <video> tags containing embedded H264-encoded bytestrings. If codec is set to 'gif', we instead use <img> tags containing embedded GIF-encoded bytestrings. Note that the resulting GIF animations skip frames when the fps period is not a multiple of 10 ms units (GIF frame delay units). Encoding at fps = 20.0, 25.0, or 50.0 works fine.

If a directory has been specified using set_show_save_dir, also saves each titled video to a file in that directory based on its title.

Arguments:
  • videos: Iterable of videos, or dictionary of {title: video}. Each video must be an iterable of images. If a video object has a metadata (VideoMetadata) attribute, its fps field provides a default framerate.
  • titles: Optional strings shown above the corresponding videos.
  • width: Optional, overrides displayed width (in pixels).
  • height: Optional, overrides displayed height (in pixels).
  • downsample: If True, each video whose width or height is greater than the specified width or height is resampled to the display resolution. This improves antialiasing and reduces the size of the notebook.
  • columns: Optional, maximum number of videos per row.
  • fps: Frames-per-second framerate (default is 60.0 except 25.0 for GIF).
  • bps: Bits-per-second bitrate (default None).
  • qp: Quantization parameter for video compression quality (default None).
  • codec: Compression algorithm; must be either 'h264' or 'gif'.
  • ylabel: Text (rotated by 90 degrees) shown on the left of each row.
  • html_class: CSS class name used in definition of HTML element.
  • return_html: If True return the raw HTML str instead of displaying.
  • **kwargs: Additional parameters (border, loop, autoplay) for html_from_compressed_video.
Returns:

html string if return_html is True.

def read_image( path_or_url: Union[str, os.PathLike[str]], *, apply_exif_transpose: bool = True, dtype: DTypeLike = None) -> np.ndarray:
767def read_image(
768    path_or_url: _Path,
769    *,
770    apply_exif_transpose: bool = True,
771    dtype: _DTypeLike = None,
772) -> _NDArray:
773  """Returns an image read from a file path or URL.
774
775  Decoding is performed using `PIL`, which supports `uint8` images with 1, 3,
776  or 4 channels and `uint16` images with a single channel.
777
778  Args:
779    path_or_url: Path of input file.
780    apply_exif_transpose: If True, rotate image according to EXIF orientation.
781    dtype: Data type of the returned array.  If None, `np.uint8` or `np.uint16`
782      is inferred automatically.
783  """
784  data = read_contents(path_or_url)
785  return decompress_image(data, dtype, apply_exif_transpose)

Returns an image read from a file path or URL.

Decoding is performed using PIL, which supports uint8 images with 1, 3, or 4 channels and uint16 images with a single channel.

Arguments:
  • path_or_url: Path of input file.
  • apply_exif_transpose: If True, rotate image according to EXIF orientation.
  • dtype: Data type of the returned array. If None, np.uint8 or np.uint16 is inferred automatically.
def write_image( path: Union[str, os.PathLike[str]], image: ArrayLike, fmt: str = 'png', **kwargs: Any) -> None:
788def write_image(
789    path: _Path, image: _ArrayLike, fmt: str = 'png', **kwargs: Any
790) -> None:
791  """Writes an image to a file.
792
793  Encoding is performed using `PIL`, which supports `uint8` images with 1, 3,
794  or 4 channels and `uint16` images with a single channel.
795
796  File format is explicitly provided by `fmt` and not inferred by `path`.
797
798  Args:
799    path: Path of output file.
800    image: Array-like object.  If its type is float, it is converted to np.uint8
801      using `to_uint8` (thus clamping to the input to the range [0.0, 1.0]).
802      Otherwise it must be np.uint8 or np.uint16.
803    fmt: Desired compression encoding, e.g. 'png'.
804    **kwargs: Additional parameters for `PIL.Image.save()`.
805  """
806  image = _as_valid_media_array(image)
807  if np.issubdtype(image.dtype, np.floating):
808    image = to_uint8(image)
809  with _open(path, 'wb') as f:
810    _pil_image(image).save(f, format=fmt, **kwargs)

Writes an image to a file.

Encoding is performed using PIL, which supports uint8 images with 1, 3, or 4 channels and uint16 images with a single channel.

File format is explicitly provided by fmt and not inferred by path.

Arguments:
  • path: Path of output file.
  • image: Array-like object. If its type is float, it is converted to np.uint8 using to_uint8 (thus clamping to the input to the range [0.0, 1.0]). Otherwise it must be np.uint8 or np.uint16.
  • fmt: Desired compression encoding, e.g. 'png'.
  • **kwargs: Additional parameters for PIL.Image.save().
def read_video( path_or_url: Union[str, os.PathLike[str]], **kwargs: Any) -> mediapy._VideoArray:
1801def read_video(path_or_url: _Path, **kwargs: Any) -> _VideoArray:
1802  """Returns an array containing all images read from a compressed video file.
1803
1804  >>> video = read_video('/tmp/river.mp4')
1805  >>> print(f'The framerate is {video.metadata.fps} frames/s.')
1806  >>> show_video(video)
1807
1808  >>> url = 'https://github.com/hhoppe/data/raw/main/video.mp4'
1809  >>> show_video(read_video(url))
1810
1811  Args:
1812    path_or_url: Input video file.
1813    **kwargs: Additional parameters for `VideoReader`.
1814
1815  Returns:
1816    A 4D `numpy` array with dimensions (frame, height, width, channel), or a 3D
1817    array if `output_format` is specified as 'gray'.  The returned array has an
1818    attribute `metadata` containing `VideoMetadata` information.  This enables
1819    `show_video` to retrieve the framerate in `metadata.fps`.  Note that the
1820    metadata attribute is lost in most subsequent `numpy` operations.
1821  """
1822  with VideoReader(path_or_url, **kwargs) as reader:
1823    return _VideoArray(np.array(tuple(reader)), metadata=reader.metadata)

Returns an array containing all images read from a compressed video file.

>>> video = read_video('/tmp/river.mp4')
>>> print(f'The framerate is {video.metadata.fps} frames/s.')
>>> show_video(video)
>>> url = 'https://github.com/hhoppe/data/raw/main/video.mp4'
>>> show_video(read_video(url))
Arguments:
  • path_or_url: Input video file.
  • **kwargs: Additional parameters for VideoReader.
Returns:

A 4D numpy array with dimensions (frame, height, width, channel), or a 3D array if output_format is specified as 'gray'. The returned array has an attribute metadata containing VideoMetadata information. This enables show_video to retrieve the framerate in metadata.fps. Note that the metadata attribute is lost in most subsequent numpy operations.

def write_video( path: Union[str, os.PathLike[str]], images: Iterable[np.ndarray], **kwargs: Any) -> None:
1826def write_video(path: _Path, images: Iterable[_NDArray], **kwargs: Any) -> None:
1827  """Writes images to a compressed video file.
1828
1829  >>> video = moving_circle((480, 640), num_images=60)
1830  >>> write_video('/tmp/v.mp4', video, fps=60, qp=18)
1831  >>> show_video(read_video('/tmp/v.mp4'))
1832
1833  Args:
1834    path: Output video file.
1835    images: Iterable over video frames, e.g. a 4D array or a list of 2D or 3D
1836      arrays.
1837    **kwargs: Additional parameters for `VideoWriter`.
1838  """
1839  first_image, images = _peek_first(images)
1840  shape: tuple[int, int] = first_image.shape[:2]
1841  dtype = first_image.dtype
1842  if dtype == bool:
1843    dtype = np.dtype(np.uint8)
1844  elif np.issubdtype(dtype, np.floating):
1845    dtype = np.dtype(np.uint16)
1846  kwargs = {'metadata': getattr(images, 'metadata', None), **kwargs}
1847  with VideoWriter(path, shape=shape, dtype=dtype, **kwargs) as writer:
1848    for image in images:
1849      writer.add_image(image)

Writes images to a compressed video file.

>>> video = moving_circle((480, 640), num_images=60)
>>> write_video('/tmp/v.mp4', video, fps=60, qp=18)
>>> show_video(read_video('/tmp/v.mp4'))
Arguments:
  • path: Output video file.
  • images: Iterable over video frames, e.g. a 4D array or a list of 2D or 3D arrays.
  • **kwargs: Additional parameters for VideoWriter.
class VideoReader(_VideoIO):
1368class VideoReader(_VideoIO):
1369  """Context to read a compressed video as an iterable over its images.
1370
1371  >>> with VideoReader('/tmp/river.mp4') as reader:
1372  ...   print(f'Video has {reader.num_images} images with shape={reader.shape},'
1373  ...         f' at {reader.fps} frames/sec and {reader.bps} bits/sec.')
1374  ...   for image in reader:
1375  ...     print(image.shape)
1376
1377  >>> with VideoReader('/tmp/river.mp4') as reader:
1378  ...   video = np.array(tuple(reader))
1379
1380  >>> url = 'https://github.com/hhoppe/data/raw/main/video.mp4'
1381  >>> with VideoReader(url) as reader:
1382  ...   show_video(reader)
1383
1384  Attributes:
1385    path_or_url: Location of input video.
1386    output_format: Format of output images (default 'rgb').  If 'rgb', each
1387      image has shape=(height, width, 3) with R, G, B values.  If 'yuv', each
1388      image has shape=(height, width, 3) with Y, U, V values.  If 'gray', each
1389      image has shape=(height, width).
1390    dtype: Data type for output images.  The default is `np.uint8`.  Use of
1391      `np.uint16` allows reading 10-bit or 12-bit data without precision loss.
1392    metadata: Object storing the information retrieved from the video header.
1393      Its attributes are copied as attributes in this class.
1394    num_images: Number of frames that is expected from the video stream.  This
1395      is estimated from the framerate and the duration stored in the video
1396      header, so it might be inexact.
1397    shape: The dimensions (height, width) of each video frame.
1398    fps: The framerate in frames per second.
1399    bps: The estimated bitrate of the video stream in bits per second, retrieved
1400      from the video header.
1401    stream_index: The stream index to read from. The default is 0.
1402  """
1403
1404  path_or_url: _Path
1405  output_format: str
1406  dtype: _DType
1407  metadata: VideoMetadata
1408  num_images: int
1409  shape: tuple[int, int]
1410  fps: float
1411  bps: int | None
1412  stream_index: int
1413  _num_bytes_per_image: int
1414
1415  def __init__(
1416      self,
1417      path_or_url: _Path,
1418      *,
1419      stream_index: int = 0,
1420      output_format: str = 'rgb',
1421      dtype: _DTypeLike = np.uint8,
1422  ):
1423    if output_format not in {'rgb', 'yuv', 'gray'}:
1424      raise ValueError(
1425          f'Output format {output_format} is not rgb, yuv, or gray.'
1426      )
1427    self.path_or_url = path_or_url
1428    self.output_format = output_format
1429    self.stream_index = stream_index
1430    self.dtype = np.dtype(dtype)
1431    if self.dtype.type not in (np.uint8, np.uint16):
1432      raise ValueError(f'Type {dtype} is not np.uint8 or np.uint16.')
1433    self._read_via_local_file: Any = None
1434    self._popen: subprocess.Popen[bytes] | None = None
1435    self._proc: subprocess.Popen[bytes] | None = None
1436
1437  def __enter__(self) -> 'VideoReader':
1438    try:
1439      self._read_via_local_file = _read_via_local_file(self.path_or_url)
1440      # pylint: disable-next=no-member
1441      tmp_name = self._read_via_local_file.__enter__()
1442
1443      self.metadata = _get_video_metadata(tmp_name)
1444      self.num_images, self.shape, self.fps, self.bps = self.metadata
1445      pix_fmt = self._get_pix_fmt(self.dtype, self.output_format)
1446      num_channels = {'rgb': 3, 'yuv': 3, 'gray': 1}[self.output_format]
1447      bytes_per_channel = self.dtype.itemsize
1448      self._num_bytes_per_image = (
1449          math.prod(self.shape) * num_channels * bytes_per_channel
1450      )
1451
1452      command = [
1453          '-v',
1454          'panic',
1455          '-nostdin',
1456          '-i',
1457          tmp_name,
1458          '-vcodec',
1459          'rawvideo',
1460          '-f',
1461          'image2pipe',
1462          '-map',
1463          f'0:v:{self.stream_index}',
1464          '-pix_fmt',
1465          pix_fmt,
1466          '-vsync',
1467          'vfr',
1468          '-',
1469      ]
1470      self._popen = _run_ffmpeg(
1471          command,
1472          stdout=subprocess.PIPE,
1473          stderr=subprocess.PIPE,
1474          allowed_input_files=[tmp_name],
1475      )
1476      self._proc = self._popen.__enter__()
1477    except Exception:
1478      self.__exit__(None, None, None)
1479      raise
1480    return self
1481
1482  def __exit__(self, *_: Any) -> None:
1483    self.close()
1484
1485  def read(self) -> _NDArray | None:
1486    """Reads a video image frame (or None if at end of file).
1487
1488    Returns:
1489      A numpy array in the format specified by `output_format`, i.e., a 3D
1490      array with 3 color channels, except for format 'gray' which is 2D.
1491    """
1492    assert self._proc, 'Error: reading from an already closed context.'
1493    stdout = self._proc.stdout
1494    assert stdout is not None
1495    data = stdout.read(self._num_bytes_per_image)
1496    if not data:  # Due to either end-of-file or subprocess error.
1497      self.close()  # Raises exception if subprocess had error.
1498      return None  # To indicate end-of-file.
1499    assert len(data) == self._num_bytes_per_image
1500    image = np.frombuffer(data, dtype=self.dtype)
1501    if self.output_format == 'rgb':
1502      image = image.reshape(*self.shape, 3)
1503    elif self.output_format == 'yuv':  # Convert from planar YUV to pixel YUV.
1504      image = np.moveaxis(image.reshape(3, *self.shape), 0, 2)
1505    elif self.output_format == 'gray':  # Generate 2D rather than 3D ndimage.
1506      image = image.reshape(*self.shape)
1507    else:
1508      raise AssertionError
1509    return image
1510
1511  def __iter__(self) -> Iterator[_NDArray]:
1512    while True:
1513      image = self.read()
1514      if image is None:
1515        return
1516      yield image
1517
1518  def close(self) -> None:
1519    """Terminates video reader.  (Called automatically at end of context.)"""
1520    if self._popen:
1521      self._popen.__exit__(None, None, None)
1522      self._popen = None
1523      self._proc = None
1524    if self._read_via_local_file:
1525      # pylint: disable-next=no-member
1526      self._read_via_local_file.__exit__(None, None, None)
1527      self._read_via_local_file = None

Context to read a compressed video as an iterable over its images.

>>> with VideoReader('/tmp/river.mp4') as reader:
...   print(f'Video has {reader.num_images} images with shape={reader.shape},'
...         f' at {reader.fps} frames/sec and {reader.bps} bits/sec.')
...   for image in reader:
...     print(image.shape)
>>> with VideoReader('/tmp/river.mp4') as reader:
...   video = np.array(tuple(reader))
>>> url = 'https://github.com/hhoppe/data/raw/main/video.mp4'
>>> with VideoReader(url) as reader:
...   show_video(reader)
Attributes:
  • path_or_url: Location of input video.
  • output_format: Format of output images (default 'rgb'). If 'rgb', each image has shape=(height, width, 3) with R, G, B values. If 'yuv', each image has shape=(height, width, 3) with Y, U, V values. If 'gray', each image has shape=(height, width).
  • dtype: Data type for output images. The default is np.uint8. Use of np.uint16 allows reading 10-bit or 12-bit data without precision loss.
  • metadata: Object storing the information retrieved from the video header. Its attributes are copied as attributes in this class.
  • num_images: Number of frames that is expected from the video stream. This is estimated from the framerate and the duration stored in the video header, so it might be inexact.
  • shape: The dimensions (height, width) of each video frame.
  • fps: The framerate in frames per second.
  • bps: The estimated bitrate of the video stream in bits per second, retrieved from the video header.
  • stream_index: The stream index to read from. The default is 0.
VideoReader( path_or_url: Union[str, os.PathLike[str]], *, stream_index: int = 0, output_format: str = 'rgb', dtype: DTypeLike = <class 'numpy.uint8'>)
1415  def __init__(
1416      self,
1417      path_or_url: _Path,
1418      *,
1419      stream_index: int = 0,
1420      output_format: str = 'rgb',
1421      dtype: _DTypeLike = np.uint8,
1422  ):
1423    if output_format not in {'rgb', 'yuv', 'gray'}:
1424      raise ValueError(
1425          f'Output format {output_format} is not rgb, yuv, or gray.'
1426      )
1427    self.path_or_url = path_or_url
1428    self.output_format = output_format
1429    self.stream_index = stream_index
1430    self.dtype = np.dtype(dtype)
1431    if self.dtype.type not in (np.uint8, np.uint16):
1432      raise ValueError(f'Type {dtype} is not np.uint8 or np.uint16.')
1433    self._read_via_local_file: Any = None
1434    self._popen: subprocess.Popen[bytes] | None = None
1435    self._proc: subprocess.Popen[bytes] | None = None
path_or_url: Union[str, os.PathLike[str]]
output_format: str
dtype: ~_DType
metadata: VideoMetadata
num_images: int
shape: tuple[int, int]
fps: float
bps: int | None
stream_index: int
def read(self) -> Optional[np.ndarray]:
1485  def read(self) -> _NDArray | None:
1486    """Reads a video image frame (or None if at end of file).
1487
1488    Returns:
1489      A numpy array in the format specified by `output_format`, i.e., a 3D
1490      array with 3 color channels, except for format 'gray' which is 2D.
1491    """
1492    assert self._proc, 'Error: reading from an already closed context.'
1493    stdout = self._proc.stdout
1494    assert stdout is not None
1495    data = stdout.read(self._num_bytes_per_image)
1496    if not data:  # Due to either end-of-file or subprocess error.
1497      self.close()  # Raises exception if subprocess had error.
1498      return None  # To indicate end-of-file.
1499    assert len(data) == self._num_bytes_per_image
1500    image = np.frombuffer(data, dtype=self.dtype)
1501    if self.output_format == 'rgb':
1502      image = image.reshape(*self.shape, 3)
1503    elif self.output_format == 'yuv':  # Convert from planar YUV to pixel YUV.
1504      image = np.moveaxis(image.reshape(3, *self.shape), 0, 2)
1505    elif self.output_format == 'gray':  # Generate 2D rather than 3D ndimage.
1506      image = image.reshape(*self.shape)
1507    else:
1508      raise AssertionError
1509    return image

Reads a video image frame (or None if at end of file).

Returns:

A numpy array in the format specified by output_format, i.e., a 3D array with 3 color channels, except for format 'gray' which is 2D.

def close(self) -> None:
1518  def close(self) -> None:
1519    """Terminates video reader.  (Called automatically at end of context.)"""
1520    if self._popen:
1521      self._popen.__exit__(None, None, None)
1522      self._popen = None
1523      self._proc = None
1524    if self._read_via_local_file:
1525      # pylint: disable-next=no-member
1526      self._read_via_local_file.__exit__(None, None, None)
1527      self._read_via_local_file = None

Terminates video reader. (Called automatically at end of context.)

class VideoWriter(_VideoIO):
1530class VideoWriter(_VideoIO):
1531  """Context to write a compressed video.
1532
1533  >>> shape = 480, 640
1534  >>> with VideoWriter('/tmp/v.mp4', shape, fps=60) as writer:
1535  ...   for image in moving_circle(shape, num_images=60):
1536  ...     writer.add_image(image)
1537  >>> show_video(read_video('/tmp/v.mp4'))
1538
1539
1540  Bitrate control may be specified using at most one of: `bps`, `qp`, or `crf`.
1541  If none are specified, `qp` is set to a default value.
1542  See https://slhck.info/video/2017/03/01/rate-control.html
1543
1544  If codec is 'gif', the args `bps`, `qp`, `crf`, and `encoded_format` are
1545  ignored.
1546
1547  Attributes:
1548    path: Output video.  Its suffix (e.g. '.mp4') determines the video container
1549      format.  The suffix must be '.gif' if the codec is 'gif'.
1550    shape: 2D spatial dimensions (height, width) of video image frames.  The
1551      dimensions must be even if 'encoded_format' has subsampled chroma (e.g.,
1552      'yuv420p' or 'yuv420p10le').
1553    codec: Compression algorithm as defined by "ffmpeg -codecs" (e.g., 'h264',
1554      'hevc', 'vp9', or 'gif').
1555    metadata: Optional VideoMetadata object whose `fps` and `bps` attributes are
1556      used if not specified as explicit parameters.
1557    fps: Frames-per-second framerate (default is 60.0 except 25.0 for 'gif').
1558    bps: Requested average bits-per-second bitrate (default None).
1559    qp: Quantization parameter for video compression quality (default None).
1560    crf: Constant rate factor for video compression quality (default None).
1561    ffmpeg_args: Additional arguments for `ffmpeg` command, e.g. '-g 30' to
1562      introduce I-frames, or '-bf 0' to omit B-frames.
1563    input_format: Format of input images (default 'rgb').  If 'rgb', each image
1564      has shape=(height, width, 3) or (height, width).  If 'yuv', each image has
1565      shape=(height, width, 3) with Y, U, V values.  If 'gray', each image has
1566      shape=(height, width).
1567    dtype: Expected data type for input images (any float input images are
1568      converted to `dtype`).  The default is `np.uint8`.  Use of `np.uint16` is
1569      necessary when encoding >8 bits/channel.
1570    encoded_format: Pixel format as defined by `ffmpeg -pix_fmts`, e.g.,
1571      'yuv420p' (2x2-subsampled chroma), 'yuv444p' (full-res chroma),
1572      'yuv420p10le' (10-bit per channel), etc.  The default (None) selects
1573      'yuv420p' if all shape dimensions are even, else 'yuv444p'.
1574  """
1575
1576  def __init__(
1577      self,
1578      path: _Path,
1579      shape: tuple[int, int],
1580      *,
1581      codec: str = 'h264',
1582      metadata: VideoMetadata | None = None,
1583      fps: float | None = None,
1584      bps: int | None = None,
1585      qp: int | None = None,
1586      crf: float | None = None,
1587      ffmpeg_args: str | Sequence[str] = '',
1588      input_format: str = 'rgb',
1589      dtype: _DTypeLike = np.uint8,
1590      encoded_format: str | None = None,
1591  ) -> None:
1592    _check_2d_shape(shape)
1593    if fps is None and metadata:
1594      fps = metadata.fps
1595    if fps is None:
1596      fps = 25.0 if codec == 'gif' else 60.0
1597    if fps <= 0.0:
1598      raise ValueError(f'Frame-per-second value {fps} is invalid.')
1599    if bps is None and metadata:
1600      bps = metadata.bps
1601    bps = int(bps) if bps is not None else None
1602    if bps is not None and bps <= 0:
1603      raise ValueError(f'Bitrate value {bps} is invalid.')
1604    if qp is not None and (not isinstance(qp, int) or qp <= 0):
1605      raise ValueError(
1606          f'Quantization parameter {qp} is not a positive integer.'
1607      )
1608    num_rate_specifications = sum(x is not None for x in (bps, qp, crf))
1609    if num_rate_specifications > 1:
1610      raise ValueError(
1611          f'Must specify at most one of bps, qp, or crf ({bps}, {qp}, {crf}).'
1612      )
1613    ffmpeg_args = (
1614        shlex.split(ffmpeg_args)
1615        if isinstance(ffmpeg_args, str)
1616        else list(ffmpeg_args)
1617    )
1618    if input_format not in {'rgb', 'yuv', 'gray'}:
1619      raise ValueError(f'Input format {input_format} is not rgb, yuv, or gray.')
1620    dtype = np.dtype(dtype)
1621    if dtype.type not in (np.uint8, np.uint16):
1622      raise ValueError(f'Type {dtype} is not np.uint8 or np.uint16.')
1623    self.path = pathlib.Path(path)
1624    self.shape = shape
1625    all_dimensions_are_even = all(dim % 2 == 0 for dim in shape)
1626    if encoded_format is None:
1627      encoded_format = 'yuv420p' if all_dimensions_are_even else 'yuv444p'
1628    if not all_dimensions_are_even and encoded_format.startswith(
1629        ('yuv42', 'yuvj42')
1630    ):
1631      raise ValueError(
1632          f'With encoded_format {encoded_format}, video dimensions must be'
1633          f' even, but shape is {shape}.'
1634      )
1635    self.fps = fps
1636    self.codec = codec
1637    self.bps = bps
1638    self.qp = qp
1639    self.crf = crf
1640    self.ffmpeg_args = ffmpeg_args
1641    self.input_format = input_format
1642    self.dtype = dtype
1643    self.encoded_format = encoded_format
1644    if num_rate_specifications == 0 and not ffmpeg_args:
1645      qp = 20 if math.prod(self.shape) <= 640 * 480 else 28
1646    self._bitrate_args = (
1647        (['-vb', f'{bps}'] if bps is not None else [])
1648        + (['-qp', f'{qp}'] if qp is not None else [])
1649        + (['-vb', '0', '-crf', f'{crf}'] if crf is not None else [])
1650    )
1651    if self.codec == 'gif':
1652      if self.path.suffix != '.gif':
1653        raise ValueError(f"File '{self.path}' does not have a .gif suffix.")
1654      self.encoded_format = 'pal8'
1655      self._bitrate_args = []
1656      video_filter = 'split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse'
1657      # Less common (and likely less useful) is a per-frame color palette:
1658      # video_filter = ('split[s0][s1];[s0]palettegen=stats_mode=single[p];'
1659      #                 '[s1][p]paletteuse=new=1')
1660      self.ffmpeg_args = ['-vf', video_filter, '-f', 'gif'] + self.ffmpeg_args
1661    self._write_via_local_file: Any = None
1662    self._popen: subprocess.Popen[bytes] | None = None
1663    self._proc: subprocess.Popen[bytes] | None = None
1664
1665  def __enter__(self) -> 'VideoWriter':
1666    input_pix_fmt = self._get_pix_fmt(self.dtype, self.input_format)
1667    try:
1668      self._write_via_local_file = _write_via_local_file(self.path)
1669      # pylint: disable-next=no-member
1670      tmp_name = self._write_via_local_file.__enter__()
1671
1672      # Writing to stdout using ('-f', 'mp4', '-') would require
1673      # ('-movflags', 'frag_keyframe+empty_moov') and the result is nonportable.
1674      height, width = self.shape
1675      command = (
1676          [
1677              '-v',
1678              'error',
1679              '-f',
1680              'rawvideo',
1681              '-vcodec',
1682              'rawvideo',
1683              '-pix_fmt',
1684              input_pix_fmt,
1685              '-s',
1686              f'{width}x{height}',
1687              '-r',
1688              f'{self.fps}',
1689              '-i',
1690              '-',
1691              '-an',
1692              '-vcodec',
1693              self.codec,
1694              '-pix_fmt',
1695              self.encoded_format,
1696          ]
1697          + self._bitrate_args
1698          + self.ffmpeg_args
1699          + ['-y', tmp_name]
1700      )
1701      self._popen = _run_ffmpeg(
1702          command,
1703          stdin=subprocess.PIPE,
1704          stderr=subprocess.PIPE,
1705          allowed_output_files=[tmp_name],
1706      )
1707      self._proc = self._popen.__enter__()
1708    except Exception:
1709      self.__exit__(None, None, None)
1710      raise
1711    return self
1712
1713  def __exit__(self, *_: Any) -> None:
1714    self.close()
1715
1716  def add_image(self, image: _NDArray) -> None:
1717    """Writes a video frame.
1718
1719    Args:
1720      image: Array whose dtype and first two dimensions must match the `dtype`
1721        and `shape` specified in `VideoWriter` initialization.  If
1722        `input_format` is 'gray', the image must be 2D.  For the 'rgb'
1723        input_format, the image may be either 2D (interpreted as grayscale) or
1724        3D with three (R, G, B) channels.  For the 'yuv' input_format, the image
1725        must be 3D with three (Y, U, V) channels.
1726
1727    Raises:
1728      RuntimeError: If there is an error writing to the output file.
1729    """
1730    assert self._proc, 'Error: writing to an already closed context.'
1731    if issubclass(image.dtype.type, (np.floating, np.bool_)):
1732      image = to_type(image, self.dtype)
1733    if image.dtype != self.dtype:
1734      raise ValueError(f'Image type {image.dtype} != {self.dtype}.')
1735    if self.input_format == 'gray':
1736      if image.ndim != 2:
1737        raise ValueError(f'Image dimensions {image.shape} are not 2D.')
1738    else:
1739      if image.ndim == 2 and self.input_format == 'rgb':
1740        image = np.dstack((image, image, image))
1741      if not (image.ndim == 3 and image.shape[2] == 3):
1742        raise ValueError(f'Image dimensions {image.shape} are invalid.')
1743    if image.shape[:2] != self.shape:
1744      raise ValueError(
1745          f'Image dimensions {image.shape[:2]} do not match'
1746          f' those of the initialized video {self.shape}.'
1747      )
1748    if self.input_format == 'yuv':  # Convert from per-pixel YUV to planar YUV.
1749      image = np.moveaxis(image, 2, 0)
1750    data = image.tobytes()
1751    stdin = self._proc.stdin
1752    assert stdin is not None
1753    if stdin.write(data) != len(data):
1754      self._proc.wait()
1755      stderr = self._proc.stderr
1756      assert stderr is not None
1757      s = stderr.read().decode('utf-8')
1758      raise RuntimeError(f"Error writing '{self.path}': {s}")
1759
1760  def close(self) -> None:
1761    """Finishes writing the video.  (Called automatically at end of context.)"""
1762    if self._popen:
1763      assert self._proc, 'Error: closing an already closed context.'
1764      stdin = self._proc.stdin
1765      assert stdin is not None
1766      stdin.close()
1767      if self._proc.wait():
1768        stderr = self._proc.stderr
1769        assert stderr is not None
1770        s = stderr.read().decode('utf-8')
1771        raise RuntimeError(f"Error writing '{self.path}': {s}")
1772      self._popen.__exit__(None, None, None)
1773      self._popen = None
1774      self._proc = None
1775    if self._write_via_local_file:
1776      # pylint: disable-next=no-member
1777      self._write_via_local_file.__exit__(None, None, None)
1778      self._write_via_local_file = None

Context to write a compressed video.

>>> shape = 480, 640
>>> with VideoWriter('/tmp/v.mp4', shape, fps=60) as writer:
...   for image in moving_circle(shape, num_images=60):
...     writer.add_image(image)
>>> show_video(read_video('/tmp/v.mp4'))

Bitrate control may be specified using at most one of: bps, qp, or crf. If none are specified, qp is set to a default value. See https://slhck.info/video/2017/03/01/rate-control.html

If codec is 'gif', the args bps, qp, crf, and encoded_format are ignored.

Attributes:
  • path: Output video. Its suffix (e.g. '.mp4') determines the video container format. The suffix must be '.gif' if the codec is 'gif'.
  • shape: 2D spatial dimensions (height, width) of video image frames. The dimensions must be even if 'encoded_format' has subsampled chroma (e.g., 'yuv420p' or 'yuv420p10le').
  • codec: Compression algorithm as defined by "ffmpeg -codecs" (e.g., 'h264', 'hevc', 'vp9', or 'gif').
  • metadata: Optional VideoMetadata object whose fps and bps attributes are used if not specified as explicit parameters.
  • fps: Frames-per-second framerate (default is 60.0 except 25.0 for 'gif').
  • bps: Requested average bits-per-second bitrate (default None).
  • qp: Quantization parameter for video compression quality (default None).
  • crf: Constant rate factor for video compression quality (default None).
  • ffmpeg_args: Additional arguments for ffmpeg command, e.g. '-g 30' to introduce I-frames, or '-bf 0' to omit B-frames.
  • input_format: Format of input images (default 'rgb'). If 'rgb', each image has shape=(height, width, 3) or (height, width). If 'yuv', each image has shape=(height, width, 3) with Y, U, V values. If 'gray', each image has shape=(height, width).
  • dtype: Expected data type for input images (any float input images are converted to dtype). The default is np.uint8. Use of np.uint16 is necessary when encoding >8 bits/channel.
  • encoded_format: Pixel format as defined by ffmpeg -pix_fmts, e.g., 'yuv420p' (2x2-subsampled chroma), 'yuv444p' (full-res chroma), 'yuv420p10le' (10-bit per channel), etc. The default (None) selects 'yuv420p' if all shape dimensions are even, else 'yuv444p'.
VideoWriter( path: Union[str, os.PathLike[str]], shape: tuple[int, int], *, codec: str = 'h264', metadata: VideoMetadata | None = None, fps: float | None = None, bps: int | None = None, qp: int | None = None, crf: float | None = None, ffmpeg_args: str | Sequence[str] = '', input_format: str = 'rgb', dtype: DTypeLike = <class 'numpy.uint8'>, encoded_format: str | None = None)
1576  def __init__(
1577      self,
1578      path: _Path,
1579      shape: tuple[int, int],
1580      *,
1581      codec: str = 'h264',
1582      metadata: VideoMetadata | None = None,
1583      fps: float | None = None,
1584      bps: int | None = None,
1585      qp: int | None = None,
1586      crf: float | None = None,
1587      ffmpeg_args: str | Sequence[str] = '',
1588      input_format: str = 'rgb',
1589      dtype: _DTypeLike = np.uint8,
1590      encoded_format: str | None = None,
1591  ) -> None:
1592    _check_2d_shape(shape)
1593    if fps is None and metadata:
1594      fps = metadata.fps
1595    if fps is None:
1596      fps = 25.0 if codec == 'gif' else 60.0
1597    if fps <= 0.0:
1598      raise ValueError(f'Frame-per-second value {fps} is invalid.')
1599    if bps is None and metadata:
1600      bps = metadata.bps
1601    bps = int(bps) if bps is not None else None
1602    if bps is not None and bps <= 0:
1603      raise ValueError(f'Bitrate value {bps} is invalid.')
1604    if qp is not None and (not isinstance(qp, int) or qp <= 0):
1605      raise ValueError(
1606          f'Quantization parameter {qp} is not a positive integer.'
1607      )
1608    num_rate_specifications = sum(x is not None for x in (bps, qp, crf))
1609    if num_rate_specifications > 1:
1610      raise ValueError(
1611          f'Must specify at most one of bps, qp, or crf ({bps}, {qp}, {crf}).'
1612      )
1613    ffmpeg_args = (
1614        shlex.split(ffmpeg_args)
1615        if isinstance(ffmpeg_args, str)
1616        else list(ffmpeg_args)
1617    )
1618    if input_format not in {'rgb', 'yuv', 'gray'}:
1619      raise ValueError(f'Input format {input_format} is not rgb, yuv, or gray.')
1620    dtype = np.dtype(dtype)
1621    if dtype.type not in (np.uint8, np.uint16):
1622      raise ValueError(f'Type {dtype} is not np.uint8 or np.uint16.')
1623    self.path = pathlib.Path(path)
1624    self.shape = shape
1625    all_dimensions_are_even = all(dim % 2 == 0 for dim in shape)
1626    if encoded_format is None:
1627      encoded_format = 'yuv420p' if all_dimensions_are_even else 'yuv444p'
1628    if not all_dimensions_are_even and encoded_format.startswith(
1629        ('yuv42', 'yuvj42')
1630    ):
1631      raise ValueError(
1632          f'With encoded_format {encoded_format}, video dimensions must be'
1633          f' even, but shape is {shape}.'
1634      )
1635    self.fps = fps
1636    self.codec = codec
1637    self.bps = bps
1638    self.qp = qp
1639    self.crf = crf
1640    self.ffmpeg_args = ffmpeg_args
1641    self.input_format = input_format
1642    self.dtype = dtype
1643    self.encoded_format = encoded_format
1644    if num_rate_specifications == 0 and not ffmpeg_args:
1645      qp = 20 if math.prod(self.shape) <= 640 * 480 else 28
1646    self._bitrate_args = (
1647        (['-vb', f'{bps}'] if bps is not None else [])
1648        + (['-qp', f'{qp}'] if qp is not None else [])
1649        + (['-vb', '0', '-crf', f'{crf}'] if crf is not None else [])
1650    )
1651    if self.codec == 'gif':
1652      if self.path.suffix != '.gif':
1653        raise ValueError(f"File '{self.path}' does not have a .gif suffix.")
1654      self.encoded_format = 'pal8'
1655      self._bitrate_args = []
1656      video_filter = 'split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse'
1657      # Less common (and likely less useful) is a per-frame color palette:
1658      # video_filter = ('split[s0][s1];[s0]palettegen=stats_mode=single[p];'
1659      #                 '[s1][p]paletteuse=new=1')
1660      self.ffmpeg_args = ['-vf', video_filter, '-f', 'gif'] + self.ffmpeg_args
1661    self._write_via_local_file: Any = None
1662    self._popen: subprocess.Popen[bytes] | None = None
1663    self._proc: subprocess.Popen[bytes] | None = None
path
shape
fps
codec
bps
qp
crf
ffmpeg_args
input_format
dtype
encoded_format
def add_image(self, image: np.ndarray) -> None:
1716  def add_image(self, image: _NDArray) -> None:
1717    """Writes a video frame.
1718
1719    Args:
1720      image: Array whose dtype and first two dimensions must match the `dtype`
1721        and `shape` specified in `VideoWriter` initialization.  If
1722        `input_format` is 'gray', the image must be 2D.  For the 'rgb'
1723        input_format, the image may be either 2D (interpreted as grayscale) or
1724        3D with three (R, G, B) channels.  For the 'yuv' input_format, the image
1725        must be 3D with three (Y, U, V) channels.
1726
1727    Raises:
1728      RuntimeError: If there is an error writing to the output file.
1729    """
1730    assert self._proc, 'Error: writing to an already closed context.'
1731    if issubclass(image.dtype.type, (np.floating, np.bool_)):
1732      image = to_type(image, self.dtype)
1733    if image.dtype != self.dtype:
1734      raise ValueError(f'Image type {image.dtype} != {self.dtype}.')
1735    if self.input_format == 'gray':
1736      if image.ndim != 2:
1737        raise ValueError(f'Image dimensions {image.shape} are not 2D.')
1738    else:
1739      if image.ndim == 2 and self.input_format == 'rgb':
1740        image = np.dstack((image, image, image))
1741      if not (image.ndim == 3 and image.shape[2] == 3):
1742        raise ValueError(f'Image dimensions {image.shape} are invalid.')
1743    if image.shape[:2] != self.shape:
1744      raise ValueError(
1745          f'Image dimensions {image.shape[:2]} do not match'
1746          f' those of the initialized video {self.shape}.'
1747      )
1748    if self.input_format == 'yuv':  # Convert from per-pixel YUV to planar YUV.
1749      image = np.moveaxis(image, 2, 0)
1750    data = image.tobytes()
1751    stdin = self._proc.stdin
1752    assert stdin is not None
1753    if stdin.write(data) != len(data):
1754      self._proc.wait()
1755      stderr = self._proc.stderr
1756      assert stderr is not None
1757      s = stderr.read().decode('utf-8')
1758      raise RuntimeError(f"Error writing '{self.path}': {s}")

Writes a video frame.

Arguments:
  • image: Array whose dtype and first two dimensions must match the dtype and shape specified in VideoWriter initialization. If input_format is 'gray', the image must be 2D. For the 'rgb' input_format, the image may be either 2D (interpreted as grayscale) or 3D with three (R, G, B) channels. For the 'yuv' input_format, the image must be 3D with three (Y, U, V) channels.
Raises:
  • RuntimeError: If there is an error writing to the output file.
def close(self) -> None:
1760  def close(self) -> None:
1761    """Finishes writing the video.  (Called automatically at end of context.)"""
1762    if self._popen:
1763      assert self._proc, 'Error: closing an already closed context.'
1764      stdin = self._proc.stdin
1765      assert stdin is not None
1766      stdin.close()
1767      if self._proc.wait():
1768        stderr = self._proc.stderr
1769        assert stderr is not None
1770        s = stderr.read().decode('utf-8')
1771        raise RuntimeError(f"Error writing '{self.path}': {s}")
1772      self._popen.__exit__(None, None, None)
1773      self._popen = None
1774      self._proc = None
1775    if self._write_via_local_file:
1776      # pylint: disable-next=no-member
1777      self._write_via_local_file.__exit__(None, None, None)
1778      self._write_via_local_file = None

Finishes writing the video. (Called automatically at end of context.)

class VideoMetadata(typing.NamedTuple):
1263class VideoMetadata(NamedTuple):
1264  """Represents the data stored in a video container header.
1265
1266  Attributes:
1267    num_images: Number of frames that is expected from the video stream.  This
1268      is estimated from the framerate and the duration stored in the video
1269      header, so it might be inexact.  We set the value to -1 if number of
1270      frames is not found in the header.
1271    shape: The dimensions (height, width) of each video frame.
1272    fps: The framerate in frames per second.
1273    bps: The estimated bitrate of the video stream in bits per second, retrieved
1274      from the video header.
1275  """
1276
1277  num_images: int
1278  shape: tuple[int, int]
1279  fps: float
1280  bps: int | None

Represents the data stored in a video container header.

Attributes:
  • num_images: Number of frames that is expected from the video stream. This is estimated from the framerate and the duration stored in the video header, so it might be inexact. We set the value to -1 if number of frames is not found in the header.
  • shape: The dimensions (height, width) of each video frame.
  • fps: The framerate in frames per second.
  • bps: The estimated bitrate of the video stream in bits per second, retrieved from the video header.
def compress_image(image: ArrayLike, *, fmt: str = 'png', **kwargs: Any) -> bytes:
857def compress_image(
858    image: _ArrayLike, *, fmt: str = 'png', **kwargs: Any
859) -> bytes:
860  """Returns a buffer containing a compressed image.
861
862  Args:
863    image: Array in a format supported by `PIL`, e.g. np.uint8 or np.uint16.
864    fmt: Desired compression encoding, e.g. 'png'.
865    **kwargs: Options for `PIL.save()`, e.g. `optimize=True` for greater
866      compression.
867  """
868  image = _as_valid_media_array(image)
869  with io.BytesIO() as output:
870    _pil_image(image).save(output, format=fmt, **kwargs)
871    return output.getvalue()

Returns a buffer containing a compressed image.

Arguments:
  • image: Array in a format supported by PIL, e.g. np.uint8 or np.uint16.
  • fmt: Desired compression encoding, e.g. 'png'.
  • **kwargs: Options for PIL.save(), e.g. optimize=True for greater compression.
def decompress_image( data: bytes, dtype: DTypeLike = None, apply_exif_transpose: bool = True) -> np.ndarray:
874def decompress_image(
875    data: bytes, dtype: _DTypeLike = None, apply_exif_transpose: bool = True
876) -> _NDArray:
877  """Returns an image from a compressed data buffer.
878
879  Decoding is performed using `PIL`, which supports `uint8` images with 1, 3,
880  or 4 channels and `uint16` images with a single channel.
881
882  Args:
883    data: Buffer containing compressed image.
884    dtype: Data type of the returned array.  If None, `np.uint8` or `np.uint16`
885      is inferred automatically.
886    apply_exif_transpose: If True, rotate image according to EXIF orientation.
887  """
888  pil_image: PIL.Image.Image = PIL.Image.open(io.BytesIO(data))
889  if apply_exif_transpose:
890    tmp_image = PIL.ImageOps.exif_transpose(pil_image)  # Future: in_place=True.
891    assert tmp_image
892    pil_image = tmp_image
893  if dtype is None:
894    dtype = np.uint16 if pil_image.mode.startswith('I') else np.uint8
895  return np.array(pil_image, dtype=dtype)

Returns an image from a compressed data buffer.

Decoding is performed using PIL, which supports uint8 images with 1, 3, or 4 channels and uint16 images with a single channel.

Arguments:
  • data: Buffer containing compressed image.
  • dtype: Data type of the returned array. If None, np.uint8 or np.uint16 is inferred automatically.
  • apply_exif_transpose: If True, rotate image according to EXIF orientation.
def compress_video( images: Iterable[np.ndarray], *, codec: str = 'h264', **kwargs: Any) -> bytes:
1852def compress_video(
1853    images: Iterable[_NDArray], *, codec: str = 'h264', **kwargs: Any
1854) -> bytes:
1855  """Returns a buffer containing a compressed video.
1856
1857  The video container is 'gif' for 'gif' codec, 'webm' for 'vp9' codec,
1858  and mp4 otherwise.
1859
1860  >>> video = read_video('/tmp/river.mp4')
1861  >>> data = compress_video(video, bps=10_000_000)
1862  >>> print(len(data))
1863
1864  >>> data = compress_video(moving_circle((100, 100), num_images=10), fps=10)
1865
1866  Args:
1867    images: Iterable over video frames.
1868    codec: Compression algorithm as defined by `ffmpeg -codecs` (e.g., 'h264',
1869      'hevc', 'vp9', or 'gif').
1870    **kwargs: Additional parameters for `VideoWriter`.
1871
1872  Returns:
1873    A bytes buffer containing the compressed video.
1874  """
1875  suffix = _filename_suffix_from_codec(codec)
1876  with tempfile.TemporaryDirectory() as directory_name:
1877    tmp_path = pathlib.Path(directory_name) / f'file{suffix}'
1878    write_video(tmp_path, images, codec=codec, **kwargs)
1879    return tmp_path.read_bytes()

Returns a buffer containing a compressed video.

The video container is 'gif' for 'gif' codec, 'webm' for 'vp9' codec, and mp4 otherwise.

>>> video = read_video('/tmp/river.mp4')
>>> data = compress_video(video, bps=10_000_000)
>>> print(len(data))
>>> data = compress_video(moving_circle((100, 100), num_images=10), fps=10)
Arguments:
  • images: Iterable over video frames.
  • codec: Compression algorithm as defined by ffmpeg -codecs (e.g., 'h264', 'hevc', 'vp9', or 'gif').
  • **kwargs: Additional parameters for VideoWriter.
Returns:

A bytes buffer containing the compressed video.

def decompress_video(data: bytes, **kwargs: Any) -> np.ndarray:
1882def decompress_video(data: bytes, **kwargs: Any) -> _NDArray:
1883  """Returns video images from an MP4-compressed data buffer."""
1884  with tempfile.TemporaryDirectory() as directory_name:
1885    tmp_path = pathlib.Path(directory_name) / 'file.mp4'
1886    tmp_path.write_bytes(data)
1887    return read_video(tmp_path, **kwargs)

Returns video images from an MP4-compressed data buffer.

def html_from_compressed_image( data: bytes, width: int, height: int, *, title: str | None = None, border: bool | str = False, pixelated: bool = True, fmt: str = 'png') -> str:
898def html_from_compressed_image(
899    data: bytes,
900    width: int,
901    height: int,
902    *,
903    title: str | None = None,
904    border: bool | str = False,
905    pixelated: bool = True,
906    fmt: str = 'png',
907) -> str:
908  """Returns an HTML string with an image tag containing encoded data.
909
910  Args:
911    data: Compressed image bytes.
912    width: Width of HTML image in pixels.
913    height: Height of HTML image in pixels.
914    title: Optional text shown centered above image.
915    border: If `bool`, whether to place a black boundary around the image, or if
916      `str`, the boundary CSS style.
917    pixelated: If True, sets the CSS style to 'image-rendering: pixelated;'.
918    fmt: Compression encoding.
919  """
920  b64 = base64.b64encode(data).decode('utf-8')
921  if isinstance(border, str):
922    border = f'{border}; '
923  elif border:
924    border = 'border:1px solid black; '
925  else:
926    border = ''
927  s_pixelated = 'pixelated' if pixelated else 'auto'
928  s = (
929      f'<img width="{width}" height="{height}"'
930      f' style="{border}image-rendering:{s_pixelated}; object-fit:cover;"'
931      f' src="data:image/{fmt};base64,{b64}"/>'
932  )
933  if title is not None:
934    s = f"""<div style="display:flex; align-items:left;">
935      <div style="display:flex; flex-direction:column; align-items:center;">
936      <div>{title}</div><div>{s}</div></div></div>"""
937  return s

Returns an HTML string with an image tag containing encoded data.

Arguments:
  • data: Compressed image bytes.
  • width: Width of HTML image in pixels.
  • height: Height of HTML image in pixels.
  • title: Optional text shown centered above image.
  • border: If bool, whether to place a black boundary around the image, or if str, the boundary CSS style.
  • pixelated: If True, sets the CSS style to 'image-rendering: pixelated;'.
  • fmt: Compression encoding.
def html_from_compressed_video( data: bytes, width: int, height: int, *, title: str | None = None, border: bool | str = False, loop: bool = True, autoplay: bool = True) -> str:
1890def html_from_compressed_video(
1891    data: bytes,
1892    width: int,
1893    height: int,
1894    *,
1895    title: str | None = None,
1896    border: bool | str = False,
1897    loop: bool = True,
1898    autoplay: bool = True,
1899) -> str:
1900  """Returns an HTML string with a video tag containing H264-encoded data.
1901
1902  Args:
1903    data: MP4-compressed video bytes.
1904    width: Width of HTML video in pixels.
1905    height: Height of HTML video in pixels.
1906    title: Optional text shown centered above the video.
1907    border: If `bool`, whether to place a black boundary around the image, or if
1908      `str`, the boundary CSS style.
1909    loop: If True, the playback repeats forever.
1910    autoplay: If True, video playback starts without having to click.
1911  """
1912  b64 = base64.b64encode(data).decode('utf-8')
1913  if isinstance(border, str):
1914    border = f'{border}; '
1915  elif border:
1916    border = 'border:1px solid black; '
1917  else:
1918    border = ''
1919  options = (
1920      f'controls width="{width}" height="{height}"'
1921      f' style="{border}object-fit:cover;"'
1922      f'{" loop" if loop else ""}'
1923      f'{" autoplay muted" if autoplay else ""}'
1924  )
1925  s = f"""<video {options}>
1926      <source src="data:video/mp4;base64,{b64}" type="video/mp4"/>
1927      This browser does not support the video tag.
1928      </video>"""
1929  if title is not None:
1930    s = f"""<div style="display:flex; align-items:left;">
1931      <div style="display:flex; flex-direction:column; align-items:center;">
1932      <div>{title}</div><div>{s}</div></div></div>"""
1933  return s

Returns an HTML string with a video tag containing H264-encoded data.

Arguments:
  • data: MP4-compressed video bytes.
  • width: Width of HTML video in pixels.
  • height: Height of HTML video in pixels.
  • title: Optional text shown centered above the video.
  • border: If bool, whether to place a black boundary around the image, or if str, the boundary CSS style.
  • loop: If True, the playback repeats forever.
  • autoplay: If True, video playback starts without having to click.
def resize_image(image: ArrayLike, shape: tuple[int, int]) -> np.ndarray:
615def resize_image(image: _ArrayLike, shape: tuple[int, int]) -> _NDArray:
616  """Resizes image to specified spatial dimensions using a Lanczos filter.
617
618  Args:
619    image: Array-like 2D or 3D object, where dtype is uint or floating-point.
620    shape: 2D spatial dimensions (height, width) of output image.
621
622  Returns:
623    A resampled image whose spatial dimensions match `shape`.
624  """
625  image = _as_valid_media_array(image)
626  if image.ndim not in (2, 3):
627    raise ValueError(f'Image shape {image.shape} is neither 2D nor 3D.')
628  _check_2d_shape(shape)
629
630  # A PIL image can be multichannel only if it has 3 or 4 uint8 channels,
631  # and it can be resized only if it is uint8 or float32.
632  supported_single_channel = (
633      np.issubdtype(image.dtype, np.floating) or image.dtype == np.uint8
634  ) and image.ndim == 2
635  supported_multichannel = (
636      image.dtype == np.uint8 and image.ndim == 3 and image.shape[2] in (3, 4)
637  )
638  if supported_single_channel or supported_multichannel:
639    return np.array(
640        _pil_image(image).resize(
641            shape[::-1], resample=PIL.Image.Resampling.LANCZOS
642        ),
643        dtype=image.dtype,
644    )
645  if image.ndim == 2:
646    # We convert to floating-point for resizing and convert back.
647    return to_type(resize_image(to_float01(image), shape), image.dtype)
648  # We resize each image channel individually.
649  return np.dstack(
650      [resize_image(channel, shape) for channel in np.moveaxis(image, -1, 0)]
651  )

Resizes image to specified spatial dimensions using a Lanczos filter.

Arguments:
  • image: Array-like 2D or 3D object, where dtype is uint or floating-point.
  • shape: 2D spatial dimensions (height, width) of output image.
Returns:

A resampled image whose spatial dimensions match shape.

def resize_video(video: Iterable[np.ndarray], shape: tuple[int, int]) -> np.ndarray:
657def resize_video(video: Iterable[_NDArray], shape: tuple[int, int]) -> _NDArray:
658  """Resizes `video` to specified spatial dimensions using a Lanczos filter.
659
660  Args:
661    video: Iterable of images.
662    shape: 2D spatial dimensions (height, width) of output video.
663
664  Returns:
665    A resampled video whose spatial dimensions match `shape`.
666  """
667  _check_2d_shape(shape)
668  return np.array([resize_image(image, shape) for image in video])

Resizes video to specified spatial dimensions using a Lanczos filter.

Arguments:
  • video: Iterable of images.
  • shape: 2D spatial dimensions (height, width) of output video.
Returns:

A resampled video whose spatial dimensions match shape.

def to_rgb( array: ArrayLike, *, vmin: float | None = None, vmax: float | None = None, cmap: str | Callable[[ArrayLike], np.ndarray] = 'gray') -> np.ndarray:
813def to_rgb(
814    array: _ArrayLike,
815    *,
816    vmin: float | None = None,
817    vmax: float | None = None,
818    cmap: str | Callable[[_ArrayLike], _NDArray] = 'gray',
819) -> _NDArray:
820  """Maps scalar values to RGB using value bounds and a color map.
821
822  Args:
823    array: Scalar values, with arbitrary shape.
824    vmin: Explicit min value for remapping; if None, it is obtained as the
825      minimum finite value of `array`.
826    vmax: Explicit max value for remapping; if None, it is obtained as the
827      maximum finite value of `array`.
828    cmap: A `pyplot` color map or callable, to map from 1D value to 3D or 4D
829      color.
830
831  Returns:
832    A new array in which each element is affinely mapped from [vmin, vmax]
833    to [0.0, 1.0] and then color-mapped.
834  """
835  a = _as_valid_media_array(array)
836  del array
837  # For future numpy version 1.7.0:
838  # vmin = np.amin(a, where=np.isfinite(a)) if vmin is None else vmin
839  # vmax = np.amax(a, where=np.isfinite(a)) if vmax is None else vmax
840  vmin = np.amin(np.where(np.isfinite(a), a, np.inf)) if vmin is None else vmin
841  vmax = np.amax(np.where(np.isfinite(a), a, -np.inf)) if vmax is None else vmax
842  a = (a.astype('float') - vmin) / (vmax - vmin + np.finfo(float).eps)
843  if isinstance(cmap, str):
844    if hasattr(matplotlib, 'colormaps'):
845      rgb_from_scalar: Any = matplotlib.colormaps[cmap]  # Newer version.
846    else:
847      rgb_from_scalar = matplotlib.pyplot.cm.get_cmap(cmap)  # pylint: disable=no-member
848  else:
849    rgb_from_scalar = cmap
850  a = rgb_from_scalar(a)
851  # If there is a fully opaque alpha channel, remove it.
852  if a.shape[-1] == 4 and np.all(to_float01(a[..., 3])) == 1.0:
853    a = a[..., :3]
854  return a

Maps scalar values to RGB using value bounds and a color map.

Arguments:
  • array: Scalar values, with arbitrary shape.
  • vmin: Explicit min value for remapping; if None, it is obtained as the minimum finite value of array.
  • vmax: Explicit max value for remapping; if None, it is obtained as the maximum finite value of array.
  • cmap: A pyplot color map or callable, to map from 1D value to 3D or 4D color.
Returns:

A new array in which each element is affinely mapped from [vmin, vmax] to [0.0, 1.0] and then color-mapped.

def to_type(array: ArrayLike, dtype: DTypeLike) -> np.ndarray:
376def to_type(array: _ArrayLike, dtype: _DTypeLike) -> _NDArray:
377  """Returns media array converted to specified type.
378
379  A "media array" is one in which the dtype is either a floating-point type
380  (np.float32 or np.float64) or an unsigned integer type.  The array values are
381  assumed to lie in the range [0.0, 1.0] for floating-point values, and in the
382  full range for unsigned integers, e.g. [0, 255] for np.uint8.
383
384  Conversion between integers and floats maps uint(0) to 0.0 and uint(MAX) to
385  1.0.  The input array may also be of type bool, whereby True maps to
386  uint(MAX) or 1.0.  The values are scaled and clamped as appropriate during
387  type conversions.
388
389  Args:
390    array: Input array-like object (floating-point, unsigned int, or bool).
391    dtype: Desired output type (floating-point or unsigned int).
392
393  Returns:
394    Array `a` if it is already of the specified dtype, else a converted array.
395  """
396  a = np.asarray(array)
397  dtype = np.dtype(dtype)
398  del array
399  if a.dtype != bool:
400    _as_valid_media_type(a.dtype)  # Verify that 'a' has a valid dtype.
401  if a.dtype == bool:
402    result = a.astype(dtype)
403    if np.issubdtype(dtype, np.unsignedinteger):
404      result = result * dtype.type(np.iinfo(dtype).max)
405  elif a.dtype == dtype:
406    result = a
407  elif np.issubdtype(dtype, np.unsignedinteger):
408    if np.issubdtype(a.dtype, np.unsignedinteger):
409      src_max: float = np.iinfo(a.dtype).max
410    else:
411      a = np.clip(a, 0.0, 1.0)
412      src_max = 1.0
413    dst_max = np.iinfo(dtype).max
414    if dst_max <= np.iinfo(np.uint16).max:
415      scale = np.array(dst_max / src_max, dtype=np.float32)
416      result = (a * scale + 0.5).astype(dtype)
417    elif dst_max <= np.iinfo(np.uint32).max:
418      result = (a.astype(np.float64) * (dst_max / src_max) + 0.5).astype(dtype)
419    else:
420      # https://stackoverflow.com/a/66306123/
421      a = a.astype(np.float64) * (dst_max / src_max) + 0.5
422      dst = np.atleast_1d(a)
423      values_too_large = dst >= np.float64(dst_max)
424      with np.errstate(invalid='ignore'):
425        dst = dst.astype(dtype)
426      dst[values_too_large] = dst_max
427      result = dst if a.ndim > 0 else dst[0]
428  else:
429    assert np.issubdtype(dtype, np.floating)
430    result = a.astype(dtype)
431    if np.issubdtype(a.dtype, np.unsignedinteger):
432      result = result / dtype.type(np.iinfo(a.dtype).max)
433  return result

Returns media array converted to specified type.

A "media array" is one in which the dtype is either a floating-point type (np.float32 or np.float64) or an unsigned integer type. The array values are assumed to lie in the range [0.0, 1.0] for floating-point values, and in the full range for unsigned integers, e.g. [0, 255] for np.uint8.

Conversion between integers and floats maps uint(0) to 0.0 and uint(MAX) to 1.0. The input array may also be of type bool, whereby True maps to uint(MAX) or 1.0. The values are scaled and clamped as appropriate during type conversions.

Arguments:
  • array: Input array-like object (floating-point, unsigned int, or bool).
  • dtype: Desired output type (floating-point or unsigned int).
Returns:

Array a if it is already of the specified dtype, else a converted array.

def to_float01( a: ArrayLike, dtype: DTypeLike = <class 'numpy.float32'>) -> np.ndarray:
436def to_float01(a: _ArrayLike, dtype: _DTypeLike = np.float32) -> _NDArray:
437  """If array has unsigned integers, rescales them to the range [0.0, 1.0].
438
439  Scaling is such that uint(0) maps to 0.0 and uint(MAX) maps to 1.0.  See
440  `to_type`.
441
442  Args:
443    a: Input array.
444    dtype: Desired floating-point type if rescaling occurs.
445
446  Returns:
447    A new array of dtype values in the range [0.0, 1.0] if the input array `a`
448    contains unsigned integers; otherwise, array `a` is returned unchanged.
449  """
450  a = np.asarray(a)
451  dtype = np.dtype(dtype)
452  if not np.issubdtype(dtype, np.floating):
453    raise ValueError(f'Type {dtype} is not floating-point.')
454  if np.issubdtype(a.dtype, np.floating):
455    return a
456  return to_type(a, dtype)

If array has unsigned integers, rescales them to the range [0.0, 1.0].

Scaling is such that uint(0) maps to 0.0 and uint(MAX) maps to 1.0. See to_type.

Arguments:
  • a: Input array.
  • dtype: Desired floating-point type if rescaling occurs.
Returns:

A new array of dtype values in the range [0.0, 1.0] if the input array a contains unsigned integers; otherwise, array a is returned unchanged.

def to_uint8(a: ArrayLike) -> np.ndarray:
459def to_uint8(a: _ArrayLike) -> _NDArray:
460  """Returns array converted to uint8 values; see `to_type`."""
461  return to_type(a, np.uint8)

Returns array converted to uint8 values; see to_type.

def set_output_height(num_pixels: int) -> None:
329def set_output_height(num_pixels: int) -> None:
330  """Overrides the height of the current output cell, if using Colab."""
331  try:
332    # We want to fail gracefully for non-Colab IPython notebooks.
333    output = importlib.import_module('google.colab.output')
334    s = f'google.colab.output.setIframeHeight("{num_pixels}px")'
335    output.eval_js(s)
336  except (ModuleNotFoundError, AttributeError):
337    pass

Overrides the height of the current output cell, if using Colab.

def set_max_output_height(num_pixels: int) -> None:
340def set_max_output_height(num_pixels: int) -> None:
341  """Sets the maximum height of the current output cell, if using Colab."""
342  try:
343    # We want to fail gracefully for non-Colab IPython notebooks.
344    output = importlib.import_module('google.colab.output')
345    s = (
346        'google.colab.output.setIframeHeight('
347        f'0, true, {{maxHeight: {num_pixels}}})'
348    )
349    output.eval_js(s)
350  except (ModuleNotFoundError, AttributeError):
351    pass

Sets the maximum height of the current output cell, if using Colab.

def color_ramp( shape: tuple[int, int] = (64, 64), *, dtype: DTypeLike = <class 'numpy.float32'>) -> np.ndarray:
467def color_ramp(
468    shape: tuple[int, int] = (64, 64), *, dtype: _DTypeLike = np.float32
469) -> _NDArray:
470  """Returns an image of a red-green color gradient.
471
472  This is useful for quick experimentation and testing.  See also
473  `moving_circle` to generate a sample video.
474
475  Args:
476    shape: 2D spatial dimensions (height, width) of generated image.
477    dtype: Type (uint or floating) of resulting pixel values.
478  """
479  _check_2d_shape(shape)
480  dtype = _as_valid_media_type(dtype)
481  yx = (np.moveaxis(np.indices(shape), 0, -1) + 0.5) / shape
482  image = np.insert(yx, 2, 0.0, axis=-1)
483  return to_type(image, dtype)

Returns an image of a red-green color gradient.

This is useful for quick experimentation and testing. See also moving_circle to generate a sample video.

Arguments:
  • shape: 2D spatial dimensions (height, width) of generated image.
  • dtype: Type (uint or floating) of resulting pixel values.
def moving_circle( shape: tuple[int, int] = (256, 256), num_images: int = 10, *, dtype: DTypeLike = <class 'numpy.float32'>) -> np.ndarray:
486def moving_circle(
487    shape: tuple[int, int] = (256, 256),
488    num_images: int = 10,
489    *,
490    dtype: _DTypeLike = np.float32,
491) -> _NDArray:
492  """Returns a video of a circle moving in front of a color ramp.
493
494  This is useful for quick experimentation and testing.  See also `color_ramp`
495  to generate a sample image.
496
497  >>> show_video(moving_circle((480, 640), 60), fps=60)
498
499  Args:
500    shape: 2D spatial dimensions (height, width) of generated video.
501    num_images: Number of video frames.
502    dtype: Type (uint or floating) of resulting pixel values.
503  """
504  _check_2d_shape(shape)
505  dtype = np.dtype(dtype)
506
507  def generate_image(image_index: int) -> _NDArray:
508    """Returns a video frame image."""
509    image = color_ramp(shape, dtype=dtype)
510    yx = np.moveaxis(np.indices(shape), 0, -1)
511    center = shape[0] * 0.6, shape[1] * (image_index + 0.5) / num_images
512    radius_squared = (min(shape) * 0.1) ** 2
513    inside = np.sum((yx - center) ** 2, axis=-1) < radius_squared
514    white_circle_color = 1.0, 1.0, 1.0
515    if np.issubdtype(dtype, np.unsignedinteger):
516      white_circle_color = to_type([white_circle_color], dtype)[0]
517    image[inside] = white_circle_color
518    return image
519
520  return np.array([generate_image(i) for i in range(num_images)])

Returns a video of a circle moving in front of a color ramp.

This is useful for quick experimentation and testing. See also color_ramp to generate a sample image.

>>> show_video(moving_circle((480, 640), 60), fps=60)
Arguments:
  • shape: 2D spatial dimensions (height, width) of generated video.
  • num_images: Number of video frames.
  • dtype: Type (uint or floating) of resulting pixel values.
class set_show_save_dir:
734class set_show_save_dir:  # pylint: disable=invalid-name
735  """Save all titled output from `show_*()` calls into files.
736
737  If the specified `directory` is not None, all titled images and videos
738  displayed by `show_image`, `show_images`, `show_video`, and `show_videos` are
739  also saved as files within the directory.
740
741  It can be used either to set the state or as a context manager:
742
743  >>> set_show_save_dir('/tmp')
744  >>> show_image(color_ramp(), title='image1')  # Creates /tmp/image1.png.
745  >>> show_video(moving_circle(), title='video2')  # Creates /tmp/video2.mp4.
746  >>> set_show_save_dir(None)
747
748  >>> with set_show_save_dir('/tmp'):
749  ...   show_image(color_ramp(), title='image1')  # Creates /tmp/image1.png.
750  ...   show_video(moving_circle(), title='video2')  # Creates /tmp/video2.mp4.
751  """
752
753  def __init__(self, directory: _Path | None):
754    self._old_show_save_dir = _config.show_save_dir
755    _config.show_save_dir = directory
756
757  def __enter__(self) -> None:
758    pass
759
760  def __exit__(self, *_: Any) -> None:
761    _config.show_save_dir = self._old_show_save_dir

Save all titled output from show_*() calls into files.

If the specified directory is not None, all titled images and videos displayed by show_image, show_images, show_video, and show_videos are also saved as files within the directory.

It can be used either to set the state or as a context manager:

>>> set_show_save_dir('/tmp')
>>> show_image(color_ramp(), title='image1')  # Creates /tmp/image1.png.
>>> show_video(moving_circle(), title='video2')  # Creates /tmp/video2.mp4.
>>> set_show_save_dir(None)
>>> with set_show_save_dir('/tmp'):
...   show_image(color_ramp(), title='image1')  # Creates /tmp/image1.png.
...   show_video(moving_circle(), title='video2')  # Creates /tmp/video2.mp4.
set_show_save_dir(directory: Union[str, os.PathLike[str], NoneType])
753  def __init__(self, directory: _Path | None):
754    self._old_show_save_dir = _config.show_save_dir
755    _config.show_save_dir = directory
def set_ffmpeg(name_or_path: Union[str, os.PathLike[str]]) -> None:
315def set_ffmpeg(name_or_path: _Path) -> None:
316  """Specifies the name or path for the `ffmpeg` external program.
317
318  The `ffmpeg` program is required for compressing and decompressing video.
319  (It is used in `read_video`, `write_video`, `show_video`, `show_videos`,
320  etc.)
321
322  Args:
323    name_or_path: Either a filename within a directory of `os.environ['PATH']`
324      or a filepath.  The default setting is 'ffmpeg'.
325  """
326  _config.ffmpeg_name_or_path = name_or_path

Specifies the name or path for the ffmpeg external program.

The ffmpeg program is required for compressing and decompressing video. (It is used in read_video, write_video, show_video, show_videos, etc.)

Arguments:
  • name_or_path: Either a filename within a directory of os.environ['PATH'] or a filepath. The default setting is 'ffmpeg'.
def video_is_available() -> bool:
1255def video_is_available() -> bool:
1256  """Returns True if the program `ffmpeg` is found.
1257
1258  See also `set_ffmpeg`.
1259  """
1260  return _search_for_ffmpeg_path() is not None

Returns True if the program ffmpeg is found.

See also set_ffmpeg.