Skip to content

owid-catalog: Data Structures and Processing

Enhanced pandas data structures with rich metadata support for OWID's data processing pipelines.

Quick Reference

from owid.catalog import Dataset, Table, Variable
from owid.catalog import processing as pr

# Create a table with metadata
tb = Table(df, metadata={"short_name": "population"})

Metadata Hierarchy

Dataset
├── metadata: DatasetMeta (sources, licenses, title)
└── Tables
    ├── metadata: TableMeta (table-level info)
    └── Variables (columns)
        └── metadata: VariableMeta (unit, description, sources)

Metadata Propagation

As the table is processed, metadata is preserved and propagated to resulting tables and variables.

# Slicing
tb_filtered = tb[tb["year"] > 2000]  # Keeps metadata
# Filtering
tb_loc = tb.loc[tb["country"] == "USA"]  # Keeps metadata
# Sorting
tb_sorted = tb.sort_values("gdp_per_capita")  # Keeps metadata
# Column operations
tb["gdp_per_capita_usd"] = tb["gdp_per_capita"] * 2

# Merging
tb_merged = pr.merge(tb1, tb2, on="country")  # Merges metadata
# Concatenating
tb_concat = pr.concat([tb1, tb2])  # Combines metadata
# Pivoting
tb_pivot = pr.pivot(tb, index="year", ...)  # Adjusts metadata
# Melting
tb_melted = pr.melt(tb, ...)

File Formats

Tables support multiple formats with automatic detection: feather, parquet, and CSV. Metadata is stored separately in .meta.json files.

Reference

Metadata-aware alternatives to pandas functions.

owid.catalog.core.processing

Common operations performed on tables and variables.

Functions:

  • ignore_warnings

    Ignore warnings. You can pass a list of specific warnings to ignore like MetadataWarning or StepWarning.

  • keep_metadata

    Decorator that turns a function that works on DataFrame or Series into a function that works

  • multi_merge

    Merge multiple tables.

  • read

    Read a file based on extension, dispatching to the appropriate reader.

  • read_custom

    Read data using a custom reader function and return a Table with metadata.

  • read_df

    Create a Table (with metadata and an origin) from a DataFrame.

ignore_warnings

ignore_warnings(
    ignore_warnings: Iterable[type] = (Warning,),
)

Ignore warnings. You can pass a list of specific warnings to ignore like MetadataWarning or StepWarning.

Usage

with ignore_warnings(): ds_garden = create_dataset(...)

Source code in lib/catalog/owid/catalog/core/warnings.py
@contextlib.contextmanager
def ignore_warnings(ignore_warnings: Iterable[type] = (Warning,)):
    """Ignore warnings. You can pass a list of specific warnings to ignore like MetadataWarning or StepWarning.

    Usage:
        with ignore_warnings():
            ds_garden = create_dataset(...)
    """
    with warnings.catch_warnings():
        for w in ignore_warnings:
            warnings.filterwarnings("ignore", category=w)  # ty: ignore
        yield

keep_metadata

keep_metadata(
    func: Callable[..., DataFrame | Series],
) -> Callable[..., Table | Indicator]

Decorator that turns a function that works on DataFrame or Series into a function that works on Table or Variable and preserves metadata. If the decorated function renames columns, their metadata won't be copied.

Example
import owid.catalog.processing as pr

@pr.keep_metadata
def my_df_func(df: pd.DataFrame) -> pd.DataFrame:
    return df + 1

tb = my_df_func(tb)


@pr.keep_metadata
def my_series_func(s: pd.Series) -> pd.Series:
    return s + 1

tb.a = my_series_func(tb.a)
Source code in lib/catalog/owid/catalog/core/tables.py
def keep_metadata(func: Callable[..., pd.DataFrame | pd.Series]) -> Callable[..., Table | indicators.Indicator]:
    """Decorator that turns a function that works on DataFrame or Series into a function that works
    on Table or Variable and preserves metadata.  If the decorated function renames columns, their
    metadata won't be copied.

    Example:
        ```python
        import owid.catalog.processing as pr

        @pr.keep_metadata
        def my_df_func(df: pd.DataFrame) -> pd.DataFrame:
            return df + 1

        tb = my_df_func(tb)


        @pr.keep_metadata
        def my_series_func(s: pd.Series) -> pd.Series:
            return s + 1

        tb.a = my_series_func(tb.a)
        ```
    """

    def wrapper(*args: Any, **kwargs: Any) -> Table | indicators.Indicator:
        tb = args[0]
        df = func(*args, **kwargs)
        if isinstance(df, pd.Series):
            return indicators.Indicator(df, name=tb.name, metadata=tb.metadata)
        elif isinstance(df, pd.DataFrame):
            return Table(df).copy_metadata(tb)
        else:
            raise ValueError(f"Unexpected return type: {type(df)}")

    return wrapper

multi_merge

multi_merge(
    tables: list[Table], *args: Any, **kwargs: Any
) -> Table

Merge multiple tables.

This is a helper function when merging more than two tables on common columns.

Parameters:

Returns:

  • combined ( Table ) –

    Merged table.

Source code in lib/catalog/owid/catalog/core/tables.py
def multi_merge(tables: list[Table], *args: Any, **kwargs: Any) -> Table:
    """Merge multiple tables.

    This is a helper function when merging more than two tables on common columns.

    Args:
        tables: Tables to merge.

    Returns:
        combined: Merged table.

    """
    combined = tables[0].copy()
    for table in tables[1:]:
        combined = combined.merge(table, *args, **kwargs)

    return combined

read

read(
    filepath_or_buffer: str | Path | IO[AnyStr],
    *args: Any,
    file_extension: str | None = None,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
    **kwargs: Any,
) -> Table

Read a file based on extension, dispatching to the appropriate reader.

Parameters:

  • filepath_or_buffer (str | Path | IO[AnyStr]) –

    Path to the file or file-like object to read.

  • *args (Any, default: () ) –

    Additional positional arguments passed to the format-specific reader.

  • file_extension (str | None, default: None ) –

    File extension (without dot). If None, inferred from filepath.

  • metadata (TableMeta | None, default: None ) –

    Table metadata.

  • origin (Origin | None, default: None ) –

    Origin of the table data.

  • underscore (bool, default: False ) –

    True to make all column names snake case.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the format-specific reader.

Returns:

  • Table

    Table with data and metadata.

Note

For reading ZIP files, use Snapshot.extracted() context manager instead. See etl/snapshot.py for the recommended approach to handling archives.

Source code in lib/catalog/owid/catalog/core/tables.py
def read(
    filepath_or_buffer: str | Path | IO[AnyStr],
    *args: Any,
    file_extension: str | None = None,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
    **kwargs: Any,
) -> Table:
    """Read a file based on extension, dispatching to the appropriate reader.

    Args:
        filepath_or_buffer: Path to the file or file-like object to read.
        *args: Additional positional arguments passed to the format-specific reader.
        file_extension: File extension (without dot). If None, inferred from filepath.
        metadata: Table metadata.
        origin: Origin of the table data.
        underscore: True to make all column names snake case.
        **kwargs: Additional keyword arguments passed to the format-specific reader.

    Returns:
        Table with data and metadata.

    Note:
        For reading ZIP files, use Snapshot.extracted() context manager instead.
        See etl/snapshot.py for the recommended approach to handling archives.
    """
    if file_extension is None:
        file_extension = str(filepath_or_buffer).split(".")[-1].lower()

    reader = EXTENSION_TO_READER.get(file_extension)
    if reader is None:
        raise ValueError(f"Unknown extension: {file_extension}")

    return reader(filepath_or_buffer, *args, metadata=metadata, origin=origin, underscore=underscore, **kwargs)

read_custom

read_custom(
    read_function: Callable,
    filepath_or_buffer: str | Path | IO[AnyStr],
    metadata: TableMeta,
    origin: Origin | None = None,
    underscore: bool = False,
    *args: Any,
    **kwargs: Any,
) -> Table

Read data using a custom reader function and return a Table with metadata.

This function allows using any custom data reading function while automatically attaching metadata and origin information to the resulting Table. Useful when standard read functions (read_csv, read_excel, etc.) don't meet specific needs.

Parameters:

  • read_function (Callable) –

    Custom function to read the data. Must accept filepath_or_buffer as first argument and return a DataFrame or Table.

  • filepath_or_buffer (str | Path | IO[AnyStr]) –

    Path to the file or file-like object to read.

  • metadata (TableMeta) –

    Table metadata.

  • origin (Origin | None, default: None ) –

    Origin of the table data.

  • underscore (bool, default: False ) –

    True to make all column names snake case.

  • *args (Any, default: () ) –

    Additional positional arguments to pass to read_function.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to read_function.

Returns:

  • Table ( Table ) –

    Data read by the custom function as a Table with attached metadata and origin.

Source code in lib/catalog/owid/catalog/core/tables.py
def read_custom(
    read_function: Callable,
    filepath_or_buffer: str | Path | IO[AnyStr],
    metadata: TableMeta,
    origin: Origin | None = None,
    underscore: bool = False,
    *args: Any,
    **kwargs: Any,
) -> Table:
    """Read data using a custom reader function and return a Table with metadata.

    This function allows using any custom data reading function while automatically
    attaching metadata and origin information to the resulting Table. Useful when
    standard read functions (read_csv, read_excel, etc.) don't meet specific needs.

    Args:
        read_function: Custom function to read the data. Must accept filepath_or_buffer as first argument and return a DataFrame or Table.
        filepath_or_buffer: Path to the file or file-like object to read.
        metadata: Table metadata.
        origin: Origin of the table data.
        underscore: True to make all column names snake case.
        *args: Additional positional arguments to pass to read_function.
        **kwargs: Additional keyword arguments to pass to read_function.

    Returns:
        Table: Data read by the custom function as a Table with attached metadata and origin.

    """
    table = Table(read_function(filepath_or_buffer, *args, **kwargs), underscore=underscore)
    table = _add_table_and_variables_metadata_to_table(table=table, metadata=metadata, origin=origin)
    return cast(Table, table)

read_df

read_df(
    df: DataFrame,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
) -> Table

Create a Table (with metadata and an origin) from a DataFrame.

Parameters:

  • df (DataFrame) –

    Input DataFrame.

  • metadata (TableMeta | None, default: None ) –

    Table metadata (with a title and description).

  • origin (Origin | None, default: None ) –

    Origin of the table.

  • underscore (bool, default: False ) –

    True to ensure all column names are snake case.

Returns:

  • Table ( Table ) –

    Original data as a Table with metadata and an origin.

Source code in lib/catalog/owid/catalog/core/tables.py
def read_df(
    df: pd.DataFrame,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
) -> Table:
    """Create a Table (with metadata and an origin) from a DataFrame.

    Args:
        df: Input DataFrame.
        metadata: Table metadata (with a title and description).
        origin: Origin of the table.
        underscore: True to ensure all column names are snake case.

    Returns:
        Table: Original data as a Table with metadata and an origin.
    """
    table = Table(df, underscore=underscore)
    table = _add_table_and_variables_metadata_to_table(table=table, metadata=metadata, origin=origin)
    return cast(Table, table)

Container for multiple tables with shared metadata.

owid.catalog.core.datasets

Classes:

  • Dataset

    A dataset is a folder containing data tables with metadata.

Functions:

Dataset dataclass

Dataset(path: str | Path)

A dataset is a folder containing data tables with metadata.

A Dataset represents a collection of related data tables stored in a directory. Each dataset has an index.json file containing metadata about the dataset and references to its tables.

Attributes:

  • path (str) –

    Path to the dataset directory.

  • metadata (DatasetMeta) –

    Dataset-level metadata (title, description, sources, etc).

Example

Load an existing dataset:

>>> ds = Dataset("data://garden/demography/2023-03-31/population")
>>> table = ds["population"]

Create a new dataset:

>>> ds = Dataset.create_empty("path/to/dataset")
>>> ds.add(table)
>>> ds.save()

Initialize a Dataset from a directory path.

Parameters:

  • path (str | Path) –

    Path to the dataset directory. Can be a string or Path object.

Methods:

  • add

    Add a table to this dataset.

  • checksum

    Calculate MD5 checksum of all data and metadata in the dataset.

  • index

    Generate an index DataFrame describing all tables in this dataset.

  • read

    Read a table from the dataset with performance options.

  • update_metadata

    Update dataset and table metadata from a YAML file.

Source code in lib/catalog/owid/catalog/core/datasets.py
def __init__(self, path: str | Path) -> None:
    """Initialize a Dataset from a directory path.

    Args:
        path: Path to the dataset directory. Can be a string or Path object.
    """
    # for convenience, accept Path objects directly
    if isinstance(path, Path):
        self.path = path.as_posix()
    else:
        self.path = path

    self.metadata = DatasetMeta.load(self._index_file)
m property

Metadata alias for shorter access (ds.m instead of ds.metadata).

add
add(
    table: Table,
    formats: list[FileFormat] = DEFAULT_FORMATS,
    repack: bool = True,
) -> None

Add a table to this dataset.

Saves the table to the dataset's directory in the specified format(s). By default, saves in multiple formats for compatibility.

Parameters:

  • table (Table) –

    The table to add to the dataset.

  • formats (list[FileFormat], default: DEFAULT_FORMATS ) –

    List of file formats to save (feather, parquet, csv). Defaults to DEFAULT_FORMATS (usually ["feather"]).

  • repack (bool, default: True ) –

    If True, optimize column dtypes to reduce file size (e.g. float64 -> float32). Set to False for very large dataframes if repacking fails or is too slow.

Raises:

  • PrimaryKeyMissing

    If table has no primary key and OWID_STRICT is set.

  • NonUniqueIndex

    If table index has duplicates and OWID_STRICT is set.

Example
>>> ds.add(table)  # Save in default format
>>> ds.add(table, formats=["csv"])  # Save only as CSV
>>> ds.add(table, repack=False)  # Skip optimization
Source code in lib/catalog/owid/catalog/core/datasets.py
def add(
    self,
    table: tables.Table,
    formats: list[FileFormat] = DEFAULT_FORMATS,
    repack: bool = True,
) -> None:
    """Add a table to this dataset.

    Saves the table to the dataset's directory in the specified format(s).
    By default, saves in multiple formats for compatibility.

    Args:
        table: The table to add to the dataset.
        formats: List of file formats to save (feather, parquet, csv).
            Defaults to DEFAULT_FORMATS (usually ["feather"]).
        repack: If True, optimize column dtypes to reduce file size
            (e.g. float64 -> float32). Set to False for very large dataframes
            if repacking fails or is too slow.

    Raises:
        PrimaryKeyMissing: If table has no primary key and OWID_STRICT is set.
        NonUniqueIndex: If table index has duplicates and OWID_STRICT is set.

    Example:
        ```python
        >>> ds.add(table)  # Save in default format
        >>> ds.add(table, formats=["csv"])  # Save only as CSV
        >>> ds.add(table, repack=False)  # Skip optimization
        ```
    """

    utils.validate_underscore(table.metadata.short_name, "Table's short_name")
    for col in list(table.columns) + list(table.index.names):
        utils.validate_underscore(col, "Variable's name")

    if not table.primary_key:
        if environ.get("OWID_STRICT"):
            raise PrimaryKeyMissing(
                f"Table `{table.metadata.short_name}` does not have a primary_key -- please use t.set_index([col, ...], verify_integrity=True) to indicate dimensions before saving"
            )
        else:
            warnings.warn(
                f"Table `{table.metadata.short_name}` does not have a primary_key -- please use t.set_index([col, ...], verify_integrity=True) to indicate dimensions before saving"
            )

    if not table.index.is_unique and environ.get("OWID_STRICT"):
        [(k, dups)] = table.index.value_counts().head(1).to_dict().items()
        raise NonUniqueIndex(
            f"Table `{table.metadata.short_name}` has duplicate values in the index -- could you have made a mistake?\n\n"
            f"e.g. key {k} is repeated {dups} times in the index"
        )

    # check Float64 and Int64 columns for np.nan
    # see: https://github.com/owid/etl/issues/1334
    for col, dtype in table.dtypes.items():
        if dtype in NULLABLE_DTYPES:
            # pandas nullable types like Float64 have their own pd.NA instead of np.nan
            # make sure we don't use wrong nan, otherwise dropna and other methods won't work
            assert np.isnan(table[col]).sum() == 0, (
                f"Column `{col}` is using np.nan, but it should be using pd.NA because it has type {table[col].dtype}"
            )

    # copy dataset metadata to the table
    table.metadata.dataset = self.metadata

    for format in formats:
        if format not in SUPPORTED_FORMATS:
            raise Exception(f"Format '{format}'' is not supported")

        table_filename = join(self.path, table.metadata.checked_name + f".{format}")
        table.to(table_filename, repack=repack)
checksum
checksum() -> str

Calculate MD5 checksum of all data and metadata in the dataset.

Generates a checksum that includes the dataset's index file and all data files. Useful for detecting changes to the dataset.

Returns:

  • str

    MD5 checksum as a hexadecimal string.

Example
>>> checksum = ds.checksum()
>>> print(f"Dataset checksum: {checksum}")
Source code in lib/catalog/owid/catalog/core/datasets.py
def checksum(self) -> str:
    """Calculate MD5 checksum of all data and metadata in the dataset.

    Generates a checksum that includes the dataset's index file and all
    data files. Useful for detecting changes to the dataset.

    Returns:
        MD5 checksum as a hexadecimal string.

    Example:
        ```python
        >>> checksum = ds.checksum()
        >>> print(f"Dataset checksum: {checksum}")
        ```
    """
    _hash = hashlib.md5()
    _hash.update(checksum_file(self._index_file).digest())

    for data_file in self._data_files:
        _hash.update(checksum_file(data_file).digest())

        metadata_file = Path(data_file).with_suffix(".meta.json").as_posix()
        _hash.update(checksum_file(metadata_file).digest())

    return _hash.hexdigest()
index
index(catalog_path: Path = Path('/')) -> DataFrame

Generate an index DataFrame describing all tables in this dataset.

Creates a summary DataFrame with one row per table, including metadata like namespace, version, checksum, dimensions, and file paths.

Parameters:

  • catalog_path (Path, default: Path('/') ) –

    Base path for calculating relative paths. Defaults to "/".

Returns:

  • DataFrame

    DataFrame with columns: namespace, dataset, version, table, checksum, is_public,

  • DataFrame

    title, description, dimensions, path, channel, and formats.

Example
>>> index = ds.index()
>>> print(index[["table", "dimensions", "checksum"]])
Source code in lib/catalog/owid/catalog/core/datasets.py
def index(self, catalog_path: Path = Path("/")) -> pd.DataFrame:
    """Generate an index DataFrame describing all tables in this dataset.

    Creates a summary DataFrame with one row per table, including metadata
    like namespace, version, checksum, dimensions, and file paths.

    Args:
        catalog_path: Base path for calculating relative paths. Defaults to "/".

    Returns:
        DataFrame with columns: namespace, dataset, version, table, checksum, is_public,
        title, description, dimensions, path, channel, and formats.

    Example:
        ```python
        >>> index = ds.index()
        >>> print(index[["table", "dimensions", "checksum"]])
        ```
    """
    base = {
        "namespace": self.metadata.namespace,
        "dataset": self.metadata.short_name,
        "version": self.metadata.version,
        "checksum": self.checksum(),
        "is_public": self.metadata.is_public,
    }
    rows = []
    for metadata_file in self._metadata_files:
        with open(metadata_file) as istream:
            metadata = TableMeta.from_dict(json.load(istream))

        row = base.copy()

        assert metadata.short_name
        row["table"] = metadata.short_name

        # Content metadata (fallback to dataset-level if table-level not available)
        row["title"] = metadata.title or self.metadata.title
        row["description"] = metadata.description or self.metadata.description

        row["dimensions"] = json.dumps(metadata.primary_key)

        table_path = Path(self.path) / metadata.short_name
        relative_path = table_path.relative_to(catalog_path)
        row["path"] = relative_path.as_posix()
        row["channel"] = relative_path.parts[0]

        row["formats"] = [f for f in SUPPORTED_FORMATS if table_path.with_suffix(f".{f}").exists()]  # ty: ignore

        rows.append(row)

    return pd.DataFrame.from_records(rows)
read
read(
    name: str | None = None,
    reset_index: bool = True,
    safe_types: bool = True,
    reset_metadata: Literal[
        "keep", "keep_origins", "reset"
    ] = "keep",
    load_data: bool = True,
) -> Table

Read a table from the dataset with performance options.

This is an alternative to ds[table_name] with more control over loading behavior for performance optimization.

Parameters:

  • name (str | None, default: None ) –

    Name of the table to read. If None and dataset has only one table, reads that table automatically.

  • reset_index (bool, default: True ) –

    If True, don't set primary keys. This can make loading large multi-index datasets much faster. Default is True.

  • safe_types (bool, default: True ) –

    If True, convert numeric columns to nullable types (Float64, Int64) and categorical to string[pyarrow]. This increases memory usage but prevents type issues. Default is True.

  • reset_metadata (Literal['keep', 'keep_origins', 'reset'], default: 'keep' ) –

    Controls variable metadata reset behavior: - "keep": Leave metadata unchanged (default) - "keep_origins": Reset metadata but retain origins attribute - "reset": Reset all variable metadata

  • load_data (bool, default: True ) –

    If False, only load metadata without actual data. Useful when you only need to inspect metadata. Default is True.

Returns:

  • Table

    The loaded table with data and metadata.

Raises:

  • ValueError

    If name is None but dataset contains multiple tables.

  • KeyError

    If the specified table name doesn't exist.

Example

Read single table with safe defaults

table = ds.read()

Keep index

>>> table = ds.read("population", reset_index=False)

Faster, less memory

>>> table = ds.read("large_table", safe_types=False)

Only metadata

>>> meta_only = ds.read(load_data=False)

Source code in lib/catalog/owid/catalog/core/datasets.py
def read(
    self,
    name: str | None = None,
    reset_index: bool = True,
    safe_types: bool = True,
    reset_metadata: Literal["keep", "keep_origins", "reset"] = "keep",
    load_data: bool = True,
) -> tables.Table:
    """Read a table from the dataset with performance options.

    This is an alternative to `ds[table_name]` with more control over
    loading behavior for performance optimization.

    Args:
        name: Name of the table to read. If None and dataset has only one
            table, reads that table automatically.
        reset_index: If True, don't set primary keys. This can make loading
            large multi-index datasets much faster. Default is True.
        safe_types: If True, convert numeric columns to nullable types
            (Float64, Int64) and categorical to string[pyarrow]. This increases
            memory usage but prevents type issues. Default is True.
        reset_metadata: Controls variable metadata reset behavior:
            - "keep": Leave metadata unchanged (default)
            - "keep_origins": Reset metadata but retain origins attribute
            - "reset": Reset all variable metadata
        load_data: If False, only load metadata without actual data. Useful
            when you only need to inspect metadata. Default is True.

    Returns:
        The loaded table with data and metadata.

    Raises:
        ValueError: If name is None but dataset contains multiple tables.
        KeyError: If the specified table name doesn't exist.

    Example:
        Read single table with safe defaults
        ```python
        table = ds.read()
        ```

        Keep index
        ```python
        >>> table = ds.read("population", reset_index=False)
        ```

        Faster, less memory
        ```python
        >>> table = ds.read("large_table", safe_types=False)
        ```

        Only metadata
        ```python
        >>> meta_only = ds.read(load_data=False)
        ```
    """
    if name is None:
        if len(self.table_names) == 1:
            name = self.table_names[0]
        else:
            raise ValueError("Multiple tables exist. Please specify the table name.")
    stem = self.path / Path(name)

    for format in SUPPORTED_FORMATS:
        path = stem.with_suffix(f".{format}")
        if path.exists():
            t = tables.Table.read(path, primary_key=[] if reset_index else None, load_data=load_data)
            t.metadata.dataset = self.metadata
            if safe_types and load_data:
                t = cast(tables.Table, to_safe_types(t))
            if reset_metadata in ["keep_origins", "reset"]:  # Handles "keep_origins" and "reset"
                t.metadata = TableMeta()
                for col in t.columns:
                    if reset_metadata == "keep_origins":  # Preserve 'origins' attribute
                        origins = t[col].metadata.origins if hasattr(t[col].metadata, "origins") else None
                        t[col].metadata = VariableMeta()
                        t[col].metadata.origins = origins  # Preserve 'origins' attribute
                    if reset_metadata == "reset":  # Reset all metadata
                        t[col].metadata = VariableMeta()
            return t

    raise KeyError(f"Table `{name}` not found, available tables: {', '.join(self.table_names[:10])}")
update_metadata
update_metadata(
    metadata_path: Path,
    yaml_params: dict[str, Any] | None = None,
    if_origins_exist: SOURCE_EXISTS_OPTIONS = "replace",
    errors: Literal["ignore", "warn", "raise"] = "raise",
    extra_variables: Literal["raise", "ignore"] = "raise",
) -> None

Update dataset and table metadata from a YAML file.

Loads metadata from a .meta.yml file and updates the dataset's metadata and all referenced tables. This is the primary way to add rich metadata to datasets in the ETL workflow.

Parameters:

  • metadata_path (Path) –

    Path to the .meta.yml file with metadata definitions. See existing metadata files for examples of the expected structure.

  • yaml_params (dict[str, Any] | None, default: None ) –

    Additional parameters to pass to the YAML loader.

  • if_origins_exist (SOURCE_EXISTS_OPTIONS, default: 'replace' ) –

    How to handle existing origins: - "replace" (default): Replace existing origin with new one - "append": Append new origin to existing origins - "fail": Raise exception if origin already exists

  • errors (Literal['ignore', 'warn', 'raise'], default: 'raise' ) –

    How to handle errors during update: - "raise" (default): Raise exception on errors - "warn": Issue warning but continue processing - "ignore": Silently ignore errors

  • extra_variables (Literal['raise', 'ignore'], default: 'raise' ) –

    How to handle variables in metadata not in dataset: - "raise" (default): Raise exception - "ignore": Skip extra variables

Example
>>> ds.update_metadata(Path("dataset.meta.yml"))
>>> ds.update_metadata(
...     Path("dataset.meta.yml"),
...     if_origins_exist="append",
...     errors="warn"
... )
Source code in lib/catalog/owid/catalog/core/datasets.py
def update_metadata(
    self,
    metadata_path: Path,
    yaml_params: dict[str, Any] | None = None,
    if_origins_exist: SOURCE_EXISTS_OPTIONS = "replace",
    errors: Literal["ignore", "warn", "raise"] = "raise",
    extra_variables: Literal["raise", "ignore"] = "raise",
) -> None:
    """Update dataset and table metadata from a YAML file.

    Loads metadata from a .meta.yml file and updates the dataset's metadata
    and all referenced tables. This is the primary way to add rich metadata
    to datasets in the ETL workflow.

    Args:
        metadata_path: Path to the .meta.yml file with metadata definitions.
            See existing metadata files for examples of the expected structure.
        yaml_params: Additional parameters to pass to the YAML loader.
        if_origins_exist: How to handle existing origins:
            - "replace" (default): Replace existing origin with new one
            - "append": Append new origin to existing origins
            - "fail": Raise exception if origin already exists
        errors: How to handle errors during update:
            - "raise" (default): Raise exception on errors
            - "warn": Issue warning but continue processing
            - "ignore": Silently ignore errors
        extra_variables: How to handle variables in metadata not in dataset:
            - "raise" (default): Raise exception
            - "ignore": Skip extra variables

    Example:
        ```python
        >>> ds.update_metadata(Path("dataset.meta.yml"))
        >>> ds.update_metadata(
        ...     Path("dataset.meta.yml"),
        ...     if_origins_exist="append",
        ...     errors="warn"
        ... )
        ```
    """
    self.metadata.update_from_yaml(metadata_path)

    with open(metadata_path) as istream:
        metadata = yaml.safe_load(istream)
        for table_name in metadata.get("tables", {}).keys():
            try:
                table = self[table_name]
            except KeyError as e:
                if errors == "raise":
                    raise e
                else:
                    if errors == "warn":
                        warnings.warn(str(e))
                    continue
            table.update_metadata_from_yaml(
                metadata_path,
                table_name,
                if_origins_exist=if_origins_exist,
                yaml_params=yaml_params,
                extra_variables=extra_variables,
            )
            table._save_metadata(join(self.path, table.metadata.checked_name + ".meta.json"))

checksum_file

checksum_file(filename: str) -> Any

Calculate MD5 checksum of a single file.

Reads the file in chunks to handle large files efficiently.

Parameters:

  • filename (str) –

    Path to the file to checksum.

Returns:

  • Any

    MD5 hash object (use .hexdigest() to get string representation).

Example
>>> checksum = checksum_file("data.csv")
>>> print(checksum.hexdigest())
Source code in lib/catalog/owid/catalog/core/datasets.py
def checksum_file(filename: str) -> Any:
    """Calculate MD5 checksum of a single file.

    Reads the file in chunks to handle large files efficiently.

    Args:
        filename: Path to the file to checksum.

    Returns:
        MD5 hash object (use .hexdigest() to get string representation).

    Example:
        ```python
        >>> checksum = checksum_file("data.csv")
        >>> print(checksum.hexdigest())
        ```
    """
    chunk_size = 2**20  # 1MB
    checksum = hashlib.md5()
    with open(filename, "rb") as istream:
        chunk = istream.read(chunk_size)
        while chunk:
            checksum.update(chunk)
            chunk = istream.read(chunk_size)

    return checksum

pandas DataFrame with column-level metadata.

owid.catalog.core.tables

Classes:

Functions:

  • align_categoricals

    Align categorical columns if possible. If not, return originals. This is necessary for

  • copy_metadata

    Copy metadata from a different table to self.

  • keep_metadata

    Decorator that turns a function that works on DataFrame or Series into a function that works

  • multi_merge

    Merge multiple tables.

  • read

    Read a file based on extension, dispatching to the appropriate reader.

  • read_custom

    Read data using a custom reader function and return a Table with metadata.

  • read_df

    Create a Table (with metadata and an origin) from a DataFrame.

  • update_variable_dimensions

    Update a variable's dimensions metadata.

Table

Table(
    *args: Any,
    metadata: TableMeta | None = None,
    short_name: str | None = None,
    underscore: bool = False,
    camel_to_snake: bool = False,
    like: Table | None = None,
    **kwargs: Any,
)

Bases: DataFrame

Enhanced pandas DataFrame with rich metadata support.

Table extends pandas DataFrame to include metadata at both the table level and individual column level. It's the primary data structure for ETL operations.

Attributes:

  • metadata (TableMeta) –

    Table-level metadata (title, description, sources, etc).

  • _fields (dict[str, VariableMeta]) –

    Dictionary mapping column names to their VariableMeta objects.

  • DEBUG

    Set to True to enable metadata validation debugging.

Example

Create a table from a DataFrame:

df = pd.DataFrame({"country": ["USA", "UK"], "gdp": [20, 3]})
table = Table(df, short_name="gdp")

Create with metadata:

meta = TableMeta(short_name="gdp", title="GDP by country")
table = Table(df, metadata=meta)

Copy metadata from another table:

new_table = Table(df, like=old_table)

Initialize a Table with data and metadata.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.init.

  • metadata (TableMeta | None, default: None ) –

    TableMeta object with table-level metadata. Creates empty metadata if not provided.

  • short_name (str | None, default: None ) –

    Shortcut to set metadata.short_name. Alternative to passing metadata=TableMeta(short_name="my_name").

  • underscore (bool, default: False ) –

    If True, convert column and index names to snake_case.

  • camel_to_snake (bool, default: False ) –

    If True, convert camelCase column names to snake_case. Only applies when underscore=True.

  • like (Table | None, default: None ) –

    Copy metadata from this Table (including column metadata). Alternative to manually copying metadata for all columns.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.init.

Example
table = Table(df, short_name="population")
table = Table(df, metadata=meta, underscore=True)
table = Table(df, like=existing_table)

Methods:

  • astype

    Cast table columns to specified dtype(s).

  • check_metadata

    Check that all variables in the table have origins.

  • copy

    Create a copy of the table with all metadata.

  • copy_metadata

    Copy metadata from another table to this table.

  • drop

    Drop specified labels from rows or columns.

  • equals_table

    Check if two tables are equal including metadata.

  • fillna

    Usual fillna, but, if the object given to fill values with is a table, transfer its metadata to the filled

  • filter

    Subset rows or columns based on their labels.

  • format

    Format the table according to OWID standards.

  • from_records

    Calling Table.from_records returns a Table, but does not call init and misses metadata.

  • get_column_or_index

    Get a variable by name from either columns or index.

  • groupby

    Groupby that preserves metadata. It uses observed=True by default.

  • join

    Join tables while preserving metadata.

  • melt

    Unpivot table from wide to long format.

  • merge

    Merge with another DataFrame or Table.

  • pivot

    Reshape table from long to wide format.

  • prune_metadata

    Remove metadata for columns no longer in the table.

  • read

    Read a table from disk in any supported format.

  • read_csv

    Read table from CSV file with accompanying metadata.

  • read_feather

    Read table from Feather file with accompanying metadata.

  • read_json

    Read the table from a JSON file plus accompanying JSON sidecar.

  • read_parquet

    Read table from Parquet file with accompanying metadata.

  • reindex

    Conform table to new index with optional filling logic.

  • rename

    Rename columns while preserving their metadata.

  • rename_index_names

    Rename index values names.

  • reset_index

    Reset the index to default integer index.

  • rolling

    Rolling operation that preserves metadata.

  • set_index

    Set the DataFrame index using specified columns.

  • to

    Save this table to disk in a supported format.

  • to_csv

    Save table as CSV with accompanying metadata file.

  • to_excel

    Save table to Excel file with optional metadata codebook.

  • to_feather

    Save table as Feather file with accompanying metadata.

  • to_json

    Save this table as a JSON file plus accompanying JSON metadata file.

  • to_parquet

    Save table as Parquet file with metadata sidecar.

  • underscore

    Convert column and index names to underscore format.

  • update_metadata

    Update table-level metadata fields.

  • update_metadata_from_yaml

    Update table and variable metadata from a YAML file.

Source code in lib/catalog/owid/catalog/core/tables.py
def __init__(
    self,
    *args: Any,
    metadata: TableMeta | None = None,
    short_name: str | None = None,
    underscore: bool = False,
    camel_to_snake: bool = False,
    like: Table | None = None,
    **kwargs: Any,
) -> None:
    """Initialize a Table with data and metadata.

    Args:
        *args: Positional arguments passed to pandas.DataFrame.__init__.
        metadata: TableMeta object with table-level metadata. Creates empty
            metadata if not provided.
        short_name: Shortcut to set metadata.short_name. Alternative to
            passing `metadata=TableMeta(short_name="my_name")`.
        underscore: If True, convert column and index names to snake_case.
        camel_to_snake: If True, convert camelCase column names to snake_case.
            Only applies when underscore=True.
        like: Copy metadata from this Table (including column metadata).
            Alternative to manually copying metadata for all columns.
        **kwargs: Keyword arguments passed to pandas.DataFrame.__init__.

    Example:
        ```python
        table = Table(df, short_name="population")
        table = Table(df, metadata=meta, underscore=True)
        table = Table(df, like=existing_table)
        ```
    """

    super().__init__(*args, **kwargs)

    # empty table metadata by default
    self.metadata = metadata or TableMeta()

    # use supplied short_name
    if short_name:
        assert self.metadata.short_name is None or (self.metadata.short_name == short_name), (
            "short_name is different from the one in metadata"
        )
        self.metadata.short_name = short_name

    # all columns have empty metadata by default
    assert not hasattr(self, "_fields")
    self._fields = defaultdict(VariableMeta)

    # underscore column names
    if underscore:
        self.underscore(inplace=True, camel_to_snake=camel_to_snake)

    # reuse metadata from a different table
    if like is not None:
        copy = self.copy_metadata(like)
        self._fields = copy._fields
        self.metadata = copy.metadata
all_columns property
all_columns: list[str]

Get names of all columns including index levels.

Returns both regular columns and index names in a single list, useful for iterating over all variables in the table.

Returns:

  • list[str]

    List of all column names and index level names.

Example
table = table.set_index(["country", "year"])
print(table.all_columns)  # ["country", "year", "gdp", "population"]
codebook property
codebook: DataFrame

Generate a human-readable codebook for this table.

Creates a DataFrame summarizing all variables in the table with their titles, descriptions, units, and source attributions.

Returns:

  • DataFrame

    DataFrame with columns:

    • column: Column name (including index columns)
    • title: Title from metadata (title_public > display.name > title)
    • description: Short description of the indicator
    • unit: Unit of measurement with short unit in parentheses
    • source: Formatted source attribution with URLs
Example
codebook = table.codebook
print(codebook.to_markdown())
m property

Metadata alias for shorter access (table.m instead of table.metadata).

primary_key property
primary_key: list[str]

Get the table's primary key column names.

Returns the names of index levels, which serve as the table's primary key for identifying unique rows.

Returns:

  • list[str]

    List of index level names (excluding None values).

Example
table = table.set_index(["country", "year"])
print(table.primary_key)  # ["country", "year"]
astype
astype(*args: Any, **kwargs: Any) -> Table

Cast table columns to specified dtype(s).

Convert one or more columns to a specified data type. Wrapper around pandas astype that returns a Table.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.astype.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.astype.

Returns:

  • Table

    Table with columns cast to specified types.

Example

Cast single column:

table = table.astype({"population": int})

Cast multiple columns:

table = table.astype({"year": int, "gdp": float})

Cast all columns:

table = table.astype(str)

Source code in lib/catalog/owid/catalog/core/tables.py
def astype(self, *args: Any, **kwargs: Any) -> Table:
    """Cast table columns to specified dtype(s).

    Convert one or more columns to a specified data type. Wrapper
    around pandas astype that returns a Table.

    Args:
        *args: Positional arguments passed to pandas.DataFrame.astype.
        **kwargs: Keyword arguments passed to pandas.DataFrame.astype.

    Returns:
        Table with columns cast to specified types.

    Example:
        Cast single column:
        ```python
        table = table.astype({"population": int})
        ```

        Cast multiple columns:
        ```python
        table = table.astype({"year": int, "gdp": float})
        ```

        Cast all columns:
        ```python
        table = table.astype(str)
        ```
    """
    return super().astype(*args, **kwargs)  # ty: ignore
check_metadata
check_metadata(
    ignore_columns: list[str] | None = None,
) -> None

Check that all variables in the table have origins.

Source code in lib/catalog/owid/catalog/core/tables.py
def check_metadata(self, ignore_columns: list[str] | None = None) -> None:
    """Check that all variables in the table have origins."""
    if ignore_columns is None:
        if self.primary_key:
            ignore_columns = self.primary_key
        else:
            ignore_columns = ["year", "country"]

    for column in [column for column in self.columns if column not in ignore_columns]:
        if not self[column].metadata.origins:
            warnings.warn(f"Variable {column} has no origins.", warnings.NoOriginsWarning)
copy
copy(deep: bool = True) -> Table

Create a copy of the table with all metadata.

Parameters:

  • deep (bool, default: True ) –

    If True (default), make a deep copy of the data and metadata. If False, creates a shallow copy.

Returns:

  • Table

    A new Table with copied data and metadata.

Example
table_copy = table.copy()  # Deep copy
table_copy = table.copy(deep=False)  # Shallow copy
Source code in lib/catalog/owid/catalog/core/tables.py
def copy(self, deep: bool = True) -> Table:
    """Create a copy of the table with all metadata.

    Args:
        deep: If True (default), make a deep copy of the data and metadata.
            If False, creates a shallow copy.

    Returns:
        A new Table with copied data and metadata.

    Example:
        ```python
        table_copy = table.copy()  # Deep copy
        table_copy = table.copy(deep=False)  # Shallow copy
        ```
    """
    # This could be causing this warning:
    #   Passing a BlockManager to Table is deprecated and will raise in a future version. Use public APIs instead.
    # but I'm not sure how to fix it
    tab = super().copy(deep=deep)
    return tab.copy_metadata(self)
copy_metadata
copy_metadata(
    from_table: Table, deep: bool = False
) -> Table

Copy metadata from another table to this table.

Copies both table-level metadata and variable-level metadata for all matching columns. Useful for preserving metadata after transformations.

Parameters:

  • from_table (Table) –

    Source table to copy metadata from.

  • deep (bool, default: False ) –

    If True, make a deep copy of the metadata. Default is False.

Returns:

  • Table

    Self, for method chaining.

Example
new_table = Table(transformed_df)
new_table.copy_metadata(original_table)
Source code in lib/catalog/owid/catalog/core/tables.py
def copy_metadata(self, from_table: Table, deep: bool = False) -> Table:
    """Copy metadata from another table to this table.

    Copies both table-level metadata and variable-level metadata for all
    matching columns. Useful for preserving metadata after transformations.

    Args:
        from_table: Source table to copy metadata from.
        deep: If True, make a deep copy of the metadata. Default is False.

    Returns:
        Self, for method chaining.

    Example:
        ```python
        new_table = Table(transformed_df)
        new_table.copy_metadata(original_table)
        ```
    """
    return copy_metadata(to_table=self, from_table=from_table, deep=deep)
drop
drop(*args: Any, **kwargs: Any) -> Table

Drop specified labels from rows or columns.

Remove rows or columns by specifying label names and axis. Wrapper around pandas drop that returns a Table.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.drop.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.drop.

Returns:

  • Table

    Table with specified labels dropped.

Example

Drop columns:

table = table.drop(columns=["column1", "column2"])

Drop rows by index:

table = table.drop(index=["row1", "row2"])

Drop columns with axis parameter:

table = table.drop(["column1"], axis=1)

Source code in lib/catalog/owid/catalog/core/tables.py
def drop(self, *args: Any, **kwargs: Any) -> Table:
    """Drop specified labels from rows or columns.

    Remove rows or columns by specifying label names and axis.
    Wrapper around pandas drop that returns a Table.

    Args:
        *args: Positional arguments passed to pandas.DataFrame.drop.
        **kwargs: Keyword arguments passed to pandas.DataFrame.drop.

    Returns:
        Table with specified labels dropped.

    Example:
        Drop columns:
        ```python
        table = table.drop(columns=["column1", "column2"])
        ```

        Drop rows by index:
        ```python
        table = table.drop(index=["row1", "row2"])
        ```

        Drop columns with axis parameter:
        ```python
        table = table.drop(["column1"], axis=1)
        ```
    """
    return cast(Table, super().drop(*args, **kwargs))
equals_table
equals_table(table: Table) -> bool

Check if two tables are equal including metadata.

Compares both data and metadata for equality. This is more comprehensive than pandas equals() which only checks data.

Parameters:

  • table (Table) –

    Table to compare with.

Returns:

  • bool

    True if tables have identical data, metadata, and variable

  • bool

    metadata. False otherwise.

Note

NaN values are handled specially to ensure consistent comparison even when NaN values are present.

Example
if table1.equals_table(table2):
... print("Tables are identical")
Source code in lib/catalog/owid/catalog/core/tables.py
def equals_table(self, table: Table) -> bool:
    """Check if two tables are equal including metadata.

    Compares both data and metadata for equality. This is more
    comprehensive than pandas equals() which only checks data.

    Args:
        table: Table to compare with.

    Returns:
        True if tables have identical data, metadata, and variable
        metadata. False otherwise.

    Note:
        NaN values are handled specially to ensure consistent comparison
        even when NaN values are present.

    Example:
        ```python
        if table1.equals_table(table2):
        ... print("Tables are identical")
        ```
    """
    return (
        isinstance(table, Table)
        and self.metadata == table.metadata
        # By simply doing self.to_dict() == table.to_dict() may return False if the dictionaries are identical but
        # contain nans.
        and self.fillna(123456).to_dict() == table.fillna(123456).to_dict()
        and self._fields == table._fields
    )
fillna
fillna(value: Any = None, **kwargs: Any) -> Table

Usual fillna, but, if the object given to fill values with is a table, transfer its metadata to the filled table.

Source code in lib/catalog/owid/catalog/core/tables.py
def fillna(self, value: Any = None, **kwargs: Any) -> Table:
    """Usual fillna, but, if the object given to fill values with is a table, transfer its metadata to the filled
    table."""
    if value is not None:
        tb = super().fillna(value, **kwargs)

        if type(value) is type(self):
            for column in tb.columns:
                if column in value.columns:
                    tb._fields[column] = indicators.combine_indicators_metadata(
                        variables=[tb[column], value[column]], operation="fillna", name=column
                    )
    else:
        tb = super().fillna(**kwargs)

    tb = cast(Table, tb)
    return tb
filter
filter(*args: Any, **kwargs: Any) -> Table

Subset rows or columns based on their labels.

Filter the table to include only specified rows or columns by name. Wrapper around pandas filter that returns a Table.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.filter.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.filter. Common kwargs include: - items: List of axis labels to select - like: Keep labels matching this string pattern - regex: Keep labels matching this regex pattern - axis: Axis to filter on (0 for rows, 1 for columns)

Returns:

  • Table

    Filtered Table with only selected labels.

Example

Filter columns by exact names:

table = table.filter(items=["country", "year", "gdp"])

Filter columns containing pattern:

table = table.filter(like="population")

Filter columns with regex:

table = table.filter(regex="^gdp_.*")

Source code in lib/catalog/owid/catalog/core/tables.py
def filter(self, *args: Any, **kwargs: Any) -> Table:
    """Subset rows or columns based on their labels.

    Filter the table to include only specified rows or columns by name.
    Wrapper around pandas filter that returns a Table.

    Args:
        *args: Positional arguments passed to pandas.DataFrame.filter.
        **kwargs: Keyword arguments passed to pandas.DataFrame.filter.
            Common kwargs include:
            - items: List of axis labels to select
            - like: Keep labels matching this string pattern
            - regex: Keep labels matching this regex pattern
            - axis: Axis to filter on (0 for rows, 1 for columns)

    Returns:
        Filtered Table with only selected labels.

    Example:
        Filter columns by exact names:
        ```python
        table = table.filter(items=["country", "year", "gdp"])
        ```

        Filter columns containing pattern:
        ```python
        table = table.filter(like="population")
        ```

        Filter columns with regex:
        ```python
        table = table.filter(regex="^gdp_.*")
        ```
    """
    return super().filter(*args, **kwargs)  # ty: ignore
format
format(
    keys: str | list[str] | None = None,
    verify_integrity: bool = True,
    underscore: bool = True,
    sort_rows: bool = True,
    sort_columns: bool = False,
    short_name: str | None = None,
    **kwargs: Any,
) -> Table

Format the table according to OWID standards.

Applies standard OWID formatting: underscores column names, sets index, verifies uniqueness, and sorts data. This is a convenience method that chains multiple operations commonly used in ETL workflows.

Note

Underscoring happens first, so use underscored key names in the keys parameter (e.g., use 'country' if original had 'Country').

Parameters:

  • keys (str | list[str] | None, default: None ) –

    Index column name(s). If None, uses ["country", "year"].

  • verify_integrity (bool, default: True ) –

    If True (default), raise error if index has duplicate entries.

  • underscore (bool, default: True ) –

    If True (default), convert column names to snake_case format. Disable if names are already properly formatted.

  • sort_rows (bool, default: True ) –

    If True (default), sort rows by index in ascending order.

  • sort_columns (bool, default: False ) –

    If True, sort columns alphabetically. Default is False.

  • short_name (str | None, default: None ) –

    Optional short name to assign to table metadata.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the underscore() method.

Returns:

  • Table

    Formatted Table with standardized structure and metadata.

Raises:

  • KeyError

    If specified keys are not found in table columns.

  • ValueError

    If verify_integrity=True and index has duplicates.

Example

Basic formatting with default country/year index:

table = table.format()

Equivalent to:

table = table.underscore().set_index(
    ["country", "year"], verify_integrity=True
).sort_index()

Custom index columns:

table = table.format(["country", "year", "sex"])

Skip underscoring if already formatted:

table = table.format(underscore=False, keys=["country", "year"])

Format with custom table name:

table = table.format(short_name="population_density")

Source code in lib/catalog/owid/catalog/core/tables.py
def format(
    self,
    keys: str | list[str] | None = None,
    verify_integrity: bool = True,
    underscore: bool = True,
    sort_rows: bool = True,
    sort_columns: bool = False,
    short_name: str | None = None,
    **kwargs: Any,
) -> Table:
    """Format the table according to OWID standards.

    Applies standard OWID formatting: underscores column names, sets index,
    verifies uniqueness, and sorts data. This is a convenience method that
    chains multiple operations commonly used in ETL workflows.

    Note:
        Underscoring happens first, so use underscored key names in the
        keys parameter (e.g., use 'country' if original had 'Country').

    Args:
        keys: Index column name(s). If None, uses ["country", "year"].
        verify_integrity: If True (default), raise error if index has
            duplicate entries.
        underscore: If True (default), convert column names to snake_case
            format. Disable if names are already properly formatted.
        sort_rows: If True (default), sort rows by index in ascending order.
        sort_columns: If True, sort columns alphabetically. Default is False.
        short_name: Optional short name to assign to table metadata.
        **kwargs: Additional arguments passed to the underscore() method.

    Returns:
        Formatted Table with standardized structure and metadata.

    Raises:
        KeyError: If specified keys are not found in table columns.
        ValueError: If verify_integrity=True and index has duplicates.

    Example:
        Basic formatting with default country/year index:
        ```python
        table = table.format()
        ```

        Equivalent to:
        ```python
        table = table.underscore().set_index(
            ["country", "year"], verify_integrity=True
        ).sort_index()
        ```

        Custom index columns:
        ```python
        table = table.format(["country", "year", "sex"])
        ```

        Skip underscoring if already formatted:
        ```python
        table = table.format(underscore=False, keys=["country", "year"])
        ```

        Format with custom table name:
        ```python
        table = table.format(short_name="population_density")
        ```
    """
    t = self
    # Underscore
    if underscore:
        t = t.underscore(**kwargs)
    # Set index
    if keys is None:
        keys = ["country", "year"]
    # Underscore keys
    elif isinstance(keys, str):
        keys = utils.underscore(keys)
    else:
        keys = [utils.underscore(k) for k in keys]
    ## Sanity check
    try:
        t = t.set_index(keys, verify_integrity=verify_integrity)
    except KeyError as e:
        if underscore:
            raise KeyError(
                f"Make sure that you are using valid column names! Note that the column names have been underscored! Available column names are: {t.columns}. You used {keys}."
            )
        else:
            raise e
    if sort_columns:
        t = t.sort_index(axis=1)
    # Sort rows
    if sort_rows:
        t = t.sort_index(axis=0)
    # Rename table.
    if short_name:
        t.metadata.short_name = short_name

    return t
from_records classmethod
from_records(*args: Any, **kwargs: Any) -> Table

Calling Table.from_records returns a Table, but does not call init and misses metadata.

Source code in lib/catalog/owid/catalog/core/tables.py
@classmethod
def from_records(cls, *args: Any, **kwargs: Any) -> Table:
    """Calling `Table.from_records` returns a Table, but does not call __init__ and misses metadata."""
    df = super().from_records(*args, **kwargs)
    return Table(df)
get_column_or_index
get_column_or_index(name: str) -> Indicator

Get a variable by name from either columns or index.

Retrieves a Variable from the table, checking both regular columns and index levels. This is useful when you don't know whether a variable is stored as a column or index.

Parameters:

  • name (str) –

    Name of the variable to retrieve.

Returns:

  • Indicator

    Variable object with data and metadata.

Raises:

  • ValueError

    If name is not found in either columns or index.

Example
var = table.get_column_or_index("country")  # Works for column or index
print(var.metadata.title)
Source code in lib/catalog/owid/catalog/core/tables.py
def get_column_or_index(self, name: str) -> indicators.Indicator:
    """Get a variable by name from either columns or index.

    Retrieves a Variable from the table, checking both regular columns
    and index levels. This is useful when you don't know whether a
    variable is stored as a column or index.

    Args:
        name: Name of the variable to retrieve.

    Returns:
        Variable object with data and metadata.

    Raises:
        ValueError: If name is not found in either columns or index.

    Example:
        ```python
        var = table.get_column_or_index("country")  # Works for column or index
        print(var.metadata.title)
        ```
    """
    if name in self.columns:
        return cast(indicators.Indicator, self[name])
    elif name in self.index.names:
        return indicators.Indicator(self.index.get_level_values(name), name=name, metadata=self._fields[name])
    else:
        raise ValueError(f"'{name}' not found in columns or index")
groupby
groupby(
    *args: Any, observed: bool = True, **kwargs: Any
) -> TableGroupBy

Groupby that preserves metadata. It uses observed=True by default.

Source code in lib/catalog/owid/catalog/core/tables.py
def groupby(self, *args: Any, observed: bool = True, **kwargs: Any) -> TableGroupBy:
    """Groupby that preserves metadata. It uses observed=True by default."""
    return TableGroupBy(
        pd.DataFrame.groupby(self.copy(deep=False), *args, observed=observed, **kwargs), self.metadata, self._fields
    )
join
join(
    other: DataFrame | Table, *args: Any, **kwargs: Any
) -> Table

Join tables while preserving metadata.

Extends pandas join with proper type signature for Table. Metadata from both tables is preserved in the result.

Parameters:

  • other (DataFrame | Table) –

    Table or DataFrame to join with.

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.join.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.join. Supports all pandas join parameters.

Returns:

  • Table

    Joined table with combined metadata.

Example
joined = table1.join(table2, on="country")
joined = table1.join(table2, how="outer")
Source code in lib/catalog/owid/catalog/core/tables.py
def join(self, other: pd.DataFrame | Table, *args: Any, **kwargs: Any) -> Table:
    """Join tables while preserving metadata.

    Extends pandas join with proper type signature for Table.
    Metadata from both tables is preserved in the result.

    Args:
        other: Table or DataFrame to join with.
        *args: Positional arguments passed to pandas.DataFrame.join.
        **kwargs: Keyword arguments passed to pandas.DataFrame.join.
            Supports all pandas join parameters.

    Returns:
        Joined table with combined metadata.

    Example:
        ```python
        joined = table1.join(table2, on="country")
        joined = table1.join(table2, how="outer")
        ```
    """
    lsuffix = kwargs.get("lsuffix", "")
    rsuffix = kwargs.get("rsuffix", "")

    t = super().join(other, *args, **kwargs)

    t = t.copy_metadata(self)

    # When lsuffix is used, columns from self that were renamed get their metadata copied.
    if lsuffix:
        for k, v in self._fields.items():
            suffixed = k + lsuffix
            if suffixed in t.columns:
                t._fields[suffixed] = v.copy()

    # copy variables metadata from other table, accounting for rsuffix renaming
    if isinstance(other, Table):
        for k, v in other._fields.items():
            if k in t.columns:
                t._fields[k] = v.copy()
            elif rsuffix and (k + rsuffix) in t.columns:
                t._fields[k + rsuffix] = v.copy()
    return t  # ty: ignore
melt
melt(
    id_vars: tuple[str] | list[str] | str | None = None,
    value_vars: tuple[str] | list[str] | str | None = None,
    var_name: str = "variable",
    value_name: str = "value",
    short_name: str | None = None,
    *args: Any,
    **kwargs: Any,
) -> Table

Unpivot table from wide to long format.

Converts columns into rows, transforming wide-format data into long-format. Wrapper around pandas melt that preserves metadata. See owid.catalog.tables.melt() for full documentation.

Parameters:

  • id_vars (tuple[str] | list[str] | str | None, default: None ) –

    Column(s) to use as identifier variables (not melted).

  • value_vars (tuple[str] | list[str] | str | None, default: None ) –

    Column(s) to unpivot. If None, uses all columns except id_vars.

  • var_name (str, default: 'variable' ) –

    Name for the variable column. Default is "variable".

  • value_name (str, default: 'value' ) –

    Name for the value column. Default is "value".

  • short_name (str | None, default: None ) –

    Optional short name for resulting table metadata.

  • *args (Any, default: () ) –

    Additional positional arguments passed to melt().

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments passed to melt().

Returns:

  • Table

    Melted Table in long format with preserved metadata.

Example

Melt all columns except country and year:

>>> long_table = table.melt(id_vars=["country", "year"])

>>> # Melt specific columns:
>>> long_table = table.melt(
...     id_vars=["country", "year"],
...     value_vars=["gdp", "population"]
... )

>>> # Custom column names:
>>> long_table = table.melt(
...     id_vars="country",
...     var_name="indicator",
...     value_name="measurement"
... )

Source code in lib/catalog/owid/catalog/core/tables.py
def melt(
    self,
    id_vars: tuple[str] | list[str] | str | None = None,
    value_vars: tuple[str] | list[str] | str | None = None,
    var_name: str = "variable",
    value_name: str = "value",
    short_name: str | None = None,
    *args: Any,
    **kwargs: Any,
) -> Table:
    """Unpivot table from wide to long format.

    Converts columns into rows, transforming wide-format data into
    long-format. Wrapper around pandas melt that preserves metadata.
    See owid.catalog.tables.melt() for full documentation.

    Args:
        id_vars: Column(s) to use as identifier variables (not melted).
        value_vars: Column(s) to unpivot. If None, uses all columns
            except id_vars.
        var_name: Name for the variable column. Default is "variable".
        value_name: Name for the value column. Default is "value".
        short_name: Optional short name for resulting table metadata.
        *args: Additional positional arguments passed to melt().
        **kwargs: Additional keyword arguments passed to melt().

    Returns:
        Melted Table in long format with preserved metadata.

    Example:
        Melt all columns except country and year:
        ```python
        >>> long_table = table.melt(id_vars=["country", "year"])

        >>> # Melt specific columns:
        >>> long_table = table.melt(
        ...     id_vars=["country", "year"],
        ...     value_vars=["gdp", "population"]
        ... )

        >>> # Custom column names:
        >>> long_table = table.melt(
        ...     id_vars="country",
        ...     var_name="indicator",
        ...     value_name="measurement"
        ... )
        ```
    """
    return melt(
        frame=self,
        id_vars=id_vars,
        value_vars=value_vars,
        var_name=var_name,
        value_name=value_name,
        short_name=short_name,
        *args,
        **kwargs,
    )
merge
merge(right: Any, *args: Any, **kwargs: Any) -> Table

Merge with another DataFrame or Table.

Wrapper around pandas merge that preserves Table metadata. See owid.catalog.tables.merge() for full documentation.

Parameters:

  • right (Any) –

    DataFrame or Table to merge with.

  • *args (Any, default: () ) –

    Positional arguments passed to merge().

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to merge().

Returns:

  • Table

    Merged Table with combined metadata.

Example
result = table1.merge(table2, on="country")
result = table1.merge(table2, left_on="code", right_on="country_code")
Source code in lib/catalog/owid/catalog/core/tables.py
def merge(self, right: Any, *args: Any, **kwargs: Any) -> Table:
    """Merge with another DataFrame or Table.

    Wrapper around pandas merge that preserves Table metadata.
    See owid.catalog.tables.merge() for full documentation.

    Args:
        right: DataFrame or Table to merge with.
        *args: Positional arguments passed to merge().
        **kwargs: Keyword arguments passed to merge().

    Returns:
        Merged Table with combined metadata.

    Example:
        ```python
        result = table1.merge(table2, on="country")
        result = table1.merge(table2, left_on="code", right_on="country_code")
        ```
    """
    return merge(left=self, right=right, *args, **kwargs)
pivot
pivot(
    *,
    index: str | list[str] | None = None,
    columns: str | list[str] | None = None,
    values: str | list[str] | None = None,
    join_column_levels_with: str | None = None,
    short_name: str | None = None,
    fill_dimensions: bool = True,
    **kwargs: Any,
) -> Table

Reshape table from long to wide format.

Converts rows into columns, transforming long-format data into wide-format. Wrapper around pandas pivot that preserves metadata. See owid.catalog.tables.pivot() for full documentation.

Parameters:

  • index (str | list[str] | None, default: None ) –

    Column(s) to use for the new index. If None, uses existing index.

  • columns (str | list[str] | None, default: None ) –

    Column(s) whose unique values become new columns.

  • values (str | list[str] | None, default: None ) –

    Column(s) to aggregate. If None, uses all remaining columns.

  • join_column_levels_with (str | None, default: None ) –

    If pivoting creates multi-level columns, join them with this separator (e.g., "_").

  • short_name (str | None, default: None ) –

    Optional short name for resulting table metadata.

  • fill_dimensions (bool, default: True ) –

    If True, fill missing dimension values. Default is True.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pivot().

Returns:

  • Table

    Pivoted Table in wide format with preserved metadata.

Example
>>> # Basic pivot:
>>> wide = table.pivot(
...     index="country",
...     columns="year",
...     values="gdp"
... )

>>> # Flatten multi-level columns:
>>> wide = table.pivot(
...     index="country",
...     columns=["year", "sex"],
...     values="population",
...     join_column_levels_with="_"
... )
Source code in lib/catalog/owid/catalog/core/tables.py
def pivot(
    self,
    *,
    index: str | list[str] | None = None,
    columns: str | list[str] | None = None,
    values: str | list[str] | None = None,
    join_column_levels_with: str | None = None,
    short_name: str | None = None,
    fill_dimensions: bool = True,
    **kwargs: Any,
) -> Table:
    """Reshape table from long to wide format.

    Converts rows into columns, transforming long-format data into
    wide-format. Wrapper around pandas pivot that preserves metadata.
    See owid.catalog.tables.pivot() for full documentation.

    Args:
        index: Column(s) to use for the new index. If None, uses
            existing index.
        columns: Column(s) whose unique values become new columns.
        values: Column(s) to aggregate. If None, uses all remaining
            columns.
        join_column_levels_with: If pivoting creates multi-level columns,
            join them with this separator (e.g., "_").
        short_name: Optional short name for resulting table metadata.
        fill_dimensions: If True, fill missing dimension values.
            Default is True.
        **kwargs: Additional arguments passed to pivot().

    Returns:
        Pivoted Table in wide format with preserved metadata.

    Example:
        ```python
        >>> # Basic pivot:
        >>> wide = table.pivot(
        ...     index="country",
        ...     columns="year",
        ...     values="gdp"
        ... )

        >>> # Flatten multi-level columns:
        >>> wide = table.pivot(
        ...     index="country",
        ...     columns=["year", "sex"],
        ...     values="population",
        ...     join_column_levels_with="_"
        ... )
        ```
    """
    return pivot(
        data=self,
        index=index,
        columns=columns,
        values=values,
        join_column_levels_with=join_column_levels_with,
        short_name=short_name,
        fill_dimensions=fill_dimensions,
        **kwargs,
    )
prune_metadata
prune_metadata() -> Table

Remove metadata for columns no longer in the table.

Cleans up the internal metadata dictionary to remove entries for columns that have been dropped. Useful after column filtering or selection operations.

Returns:

  • Table

    Self, for method chaining.

Example
subset = table[["country", "gdp"]]  # Only 2 columns
subset.prune_metadata()  # Remove metadata for dropped columns
Source code in lib/catalog/owid/catalog/core/tables.py
def prune_metadata(self) -> Table:
    """Remove metadata for columns no longer in the table.

    Cleans up the internal metadata dictionary to remove entries for columns
    that have been dropped. Useful after column filtering or selection operations.

    Returns:
        Self, for method chaining.

    Example:
        ```python
        subset = table[["country", "gdp"]]  # Only 2 columns
        subset.prune_metadata()  # Remove metadata for dropped columns
        ```
    """
    self._fields = defaultdict(VariableMeta, {col: self._fields[col] for col in self.all_columns})
    return self
read classmethod
read(path: str | Path, **kwargs: Any) -> Table

Read a table from disk in any supported format.

Automatically detects the format from file extension and loads the table with its metadata. Supports .csv, .feather, and .parquet.

Parameters:

  • path (str | Path) –

    Path to the file to read. Extension determines format.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to format-specific reader.

Returns:

  • Table

    Loaded Table with data and metadata.

Raises:

  • ValueError

    If file extension is not recognized.

Example
table = Table.read("data.feather")
table = Table.read("data.csv")
table = Table.read("data.parquet")
Source code in lib/catalog/owid/catalog/core/tables.py
@classmethod
def read(cls, path: str | Path, **kwargs: Any) -> Table:
    """Read a table from disk in any supported format.

    Automatically detects the format from file extension and loads
    the table with its metadata. Supports .csv, .feather, and .parquet.

    Args:
        path: Path to the file to read. Extension determines format.
        **kwargs: Additional arguments passed to format-specific reader.

    Returns:
        Loaded Table with data and metadata.

    Raises:
        ValueError: If file extension is not recognized.

    Example:
        ```python
        table = Table.read("data.feather")
        table = Table.read("data.csv")
        table = Table.read("data.parquet")
        ```
    """
    if isinstance(path, Path):
        path = path.as_posix()

    if path.endswith(".csv"):
        table = cls.read_csv(path, **kwargs)

    elif path.endswith(".feather"):
        table = cls.read_feather(path, **kwargs)

    elif path.endswith(".parquet"):
        table = cls.read_parquet(path, **kwargs)

    elif path.endswith(".json"):
        table = cls.read_json(path, **kwargs)

    else:
        raise ValueError(f"could not detect a suitable format to read from: {path}")

    # Fill dimensions from additional_info for compatibility
    for col in table.columns:
        dims = (table[col].m.additional_info or {}).get("dimensions")
        if dims:
            update_variable_dimensions(table[col], dims)

    if cls.DEBUG:
        table.check_metadata()

    return table
read_csv classmethod
read_csv(path: str | Path, **kwargs: Any) -> Table

Read table from CSV file with accompanying metadata.

Loads a table from a CSV file and its associated .meta.json metadata file. For example, reads both "data.csv" and "data.meta.json".

Parameters:

  • path (str | Path) –

    Path to the CSV file (must end with .csv).

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the internal metadata loader.

Returns:

  • Table

    Table with data and metadata loaded.

Raises:

Example
table = Table.read_csv("data.csv")
table = Table.read_csv(Path("data.csv"))
Source code in lib/catalog/owid/catalog/core/tables.py
@classmethod
def read_csv(cls, path: str | Path, **kwargs: Any) -> Table:
    """Read table from CSV file with accompanying metadata.

    Loads a table from a CSV file and its associated .meta.json metadata file.
    For example, reads both "data.csv" and "data.meta.json".

    Args:
        path: Path to the CSV file (must end with .csv).
        **kwargs: Additional arguments passed to the internal metadata loader.

    Returns:
        Table with data and metadata loaded.

    Raises:
        ValueError: If path doesn't end with .csv.

    Example:
        ```python
        table = Table.read_csv("data.csv")
        table = Table.read_csv(Path("data.csv"))
        ```
    """
    if isinstance(path, Path):
        path = path.as_posix()

    if not path.endswith(".csv"):
        raise ValueError(f'filename must end in ".csv": {path}')

    # load the data and add metadata
    tb = Table(pd.read_csv(path, index_col=False, na_values=[""], keep_default_na=False))
    cls._add_metadata(tb, path, **kwargs)
    return tb
read_feather classmethod
read_feather(
    path: str | Path, load_data: bool = True, **kwargs: Any
) -> Table

Read table from Feather file with accompanying metadata.

Loads a table from a Feather file and its associated .meta.json metadata file. Supports both local file paths and URLs.

Parameters:

  • path (str | Path) –

    Path or URL to the Feather file (must end with .feather).

  • load_data (bool, default: True ) –

    If True, load the actual data. If False, only load metadata and column structure (useful for inspecting large files).

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the internal metadata loader.

Returns:

  • Table

    Table with data and metadata loaded.

Raises:

  • ValueError

    If path doesn't end with .feather.

Example
table = Table.read_feather("data.feather")
table = Table.read_feather("https://example.com/data.feather")
metadata_only = Table.read_feather("data.feather", load_data=False)
Source code in lib/catalog/owid/catalog/core/tables.py
@classmethod
def read_feather(cls, path: str | Path, load_data: bool = True, **kwargs: Any) -> Table:
    """Read table from Feather file with accompanying metadata.

    Loads a table from a Feather file and its associated .meta.json metadata file.
    Supports both local file paths and URLs.

    Args:
        path: Path or URL to the Feather file (must end with .feather).
        load_data: If True, load the actual data. If False, only load metadata
            and column structure (useful for inspecting large files).
        **kwargs: Additional arguments passed to the internal metadata loader.

    Returns:
        Table with data and metadata loaded.

    Raises:
        ValueError: If path doesn't end with .feather.

    Example:
        ```python
        table = Table.read_feather("data.feather")
        table = Table.read_feather("https://example.com/data.feather")
        metadata_only = Table.read_feather("data.feather", load_data=False)
        ```
    """
    if isinstance(path, Path):
        path = path.as_posix()

    if not path.endswith(".feather"):
        raise ValueError(f'filename must end in ".feather": {path}')

    # load the data and add metadata
    if not load_data:
        metadata = cls._read_metadata(path)
        columns = list(metadata["fields"].keys())
        df = Table(pd.DataFrame(columns=columns))
    else:
        df = Table(pd.read_feather(path))

    cls._add_metadata(df, path, **kwargs)
    return df
read_json classmethod
read_json(path: str | Path, **kwargs: Any) -> Table

Read the table from a JSON file plus accompanying JSON sidecar.

The path may be a local file path or a URL.

Source code in lib/catalog/owid/catalog/core/tables.py
@classmethod
def read_json(cls, path: str | Path, **kwargs: Any) -> Table:
    """
    Read the table from a JSON file plus accompanying JSON sidecar.

    The path may be a local file path or a URL.
    """
    if isinstance(path, Path):
        path = path.as_posix()

    if not path.endswith(".json"):
        raise ValueError(f'filename must end in ".json": {path}')

    # Try to read with orient='records' first (our default format)
    # If that fails, try 'table' format for backwards compatibility, then auto-detect
    try:
        df = Table(pd.read_json(path, orient="records"))
    except (ValueError, KeyError):
        try:
            df = Table(pd.read_json(path, orient="table"))
        except (ValueError, KeyError):
            df = Table(pd.read_json(path))

    cls._add_metadata(df, path, **kwargs)
    return df
read_parquet classmethod
read_parquet(path: str | Path, **kwargs: Any) -> Table

Read table from Parquet file with accompanying metadata.

Loads a table from a Parquet file and its associated .meta.json metadata file. Supports both local file paths and URLs.

Parameters:

  • path (str | Path) –

    Path or URL to the Parquet file (must end with .parquet).

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the internal metadata loader.

Returns:

  • Table

    Table with data and metadata loaded.

Raises:

  • ValueError

    If path doesn't end with .parquet.

Example
table = Table.read_parquet("data.parquet")
table = Table.read_parquet("https://example.com/data.parquet")
Source code in lib/catalog/owid/catalog/core/tables.py
@classmethod
def read_parquet(cls, path: str | Path, **kwargs: Any) -> Table:
    """Read table from Parquet file with accompanying metadata.

    Loads a table from a Parquet file and its associated .meta.json metadata file.
    Supports both local file paths and URLs.

    Args:
        path: Path or URL to the Parquet file (must end with .parquet).
        **kwargs: Additional arguments passed to the internal metadata loader.

    Returns:
        Table with data and metadata loaded.

    Raises:
        ValueError: If path doesn't end with .parquet.

    Example:
        ```python
        table = Table.read_parquet("data.parquet")
        table = Table.read_parquet("https://example.com/data.parquet")
        ```
    """
    if isinstance(path, Path):
        path = path.as_posix()

    if not path.endswith(".parquet"):
        raise ValueError(f'filename must end in ".parquet": {path}')

    # load the data and add metadata
    df = Table(pd.read_parquet(path))
    cls._add_metadata(df, path, **kwargs)
    return df
reindex
reindex(*args: Any, **kwargs: Any) -> Table

Conform table to new index with optional filling logic.

Create a new Table with changed index. Missing values are filled according to the specified method. Wrapper around pandas reindex.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.reindex.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.reindex.

Returns:

  • Table

    Table conformed to new index.

Example

Reindex with new labels:

table = table.reindex(["A", "B", "C", "D"])

Fill missing values:

table = table.reindex(new_index, fill_value=0)

Forward fill:

table = table.reindex(new_index, method="ffill")

Source code in lib/catalog/owid/catalog/core/tables.py
def reindex(self, *args: Any, **kwargs: Any) -> Table:
    """Conform table to new index with optional filling logic.

    Create a new Table with changed index. Missing values are filled
    according to the specified method. Wrapper around pandas reindex.

    Args:
        *args: Positional arguments passed to pandas.DataFrame.reindex.
        **kwargs: Keyword arguments passed to pandas.DataFrame.reindex.

    Returns:
        Table conformed to new index.

    Example:
        Reindex with new labels:
        ```python
        table = table.reindex(["A", "B", "C", "D"])
        ```

        Fill missing values:
        ```python
        table = table.reindex(new_index, fill_value=0)
        ```

        Forward fill:
        ```python
        table = table.reindex(new_index, method="ffill")
        ```
    """
    t = super().reindex(*args, **kwargs)
    return cast(Table, t)
rename
rename(
    mapper: Any = None,
    *,
    inplace: Literal[True],
    **kwargs: Any,
) -> None
rename(
    mapper: Any = None,
    *,
    inplace: Literal[False],
    **kwargs: Any,
) -> Table
rename(*args: Any, **kwargs: Any) -> Table
rename(*args: Any, **kwargs: Any) -> Table | None

Rename columns while preserving their metadata.

Extends pandas rename to maintain variable metadata when renaming columns or index levels. Metadata follows the renamed columns automatically.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments passed to pandas.DataFrame.rename.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.DataFrame.rename. Supports all pandas rename parameters including mapper, index, columns, and inplace.

Returns:

  • Table | None

    Renamed table if inplace=False (default), None if inplace=True.

Example
new_table = table.rename(columns={"old_name": "new_name"})
table.rename(columns={"gdp": "gdp_usd"}, inplace=True)
Source code in lib/catalog/owid/catalog/core/tables.py
def rename(self, *args: Any, **kwargs: Any) -> Table | None:
    """Rename columns while preserving their metadata.

    Extends pandas rename to maintain variable metadata when renaming columns
    or index levels. Metadata follows the renamed columns automatically.

    Args:
        *args: Positional arguments passed to pandas.DataFrame.rename.
        **kwargs: Keyword arguments passed to pandas.DataFrame.rename.
            Supports all pandas rename parameters including mapper, index,
            columns, and inplace.

    Returns:
        Renamed table if inplace=False (default), None if inplace=True.

    Example:
        ```python
        new_table = table.rename(columns={"old_name": "new_name"})
        table.rename(columns={"gdp": "gdp_usd"}, inplace=True)
        ```
    """
    inplace = kwargs.get("inplace")
    old_cols = self.all_columns
    new_table = super().rename(*args, **kwargs)

    # __setattr__ on columns has already done its job of renaming
    if inplace:
        new_table = self
    else:
        assert new_table is not None
        # construct new _fields attribute
        fields = {}
        for old_col, new_col in zip(old_cols, new_table.all_columns):
            fields[new_col] = self._fields[old_col].copy()

        new_table._fields = defaultdict(VariableMeta, fields)

    if inplace:
        return None
    else:
        return cast(Table, new_table)
rename_index_names
rename_index_names(renames: dict[str, str]) -> Table

Rename index values names.

Source code in lib/catalog/owid/catalog/core/tables.py
def rename_index_names(self, renames: dict[str, str]) -> Table:
    """Rename index values names."""
    column_idx = list(self.index.names)
    column_idx_new = [renames.get(col, col) for col in column_idx]
    tb = self.reset_index().rename(columns=renames)
    tb = tb.set_index(column_idx_new)
    return tb
reset_index
reset_index(
    level: Any = None,
    *,
    inplace: Literal[True],
    **kwargs: Any,
) -> None
reset_index(
    level: Any = None,
    *,
    inplace: Literal[False],
    **kwargs: Any,
) -> Table
reset_index(
    level: Any = None,
    *,
    inplace: bool = False,
    **kwargs: Any,
) -> Table
reset_index(
    level: Any = None,
    *,
    inplace: bool = False,
    **kwargs: Any,
) -> Table | None

Reset the index to default integer index.

Extends pandas.reset_index with proper type signature for Table. Converts index levels to regular columns.

Parameters:

  • level (Any, default: None ) –

    Index level(s) to reset. If None, resets all levels.

  • inplace (bool, default: False ) –

    If True, modify the table in place. Default is False.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pandas.DataFrame.reset_index.

Returns:

  • Table | None

    Table with reset index if inplace=False, None if inplace=True.

Example
new_table = table.reset_index()  # Reset all index levels
new_table = table.reset_index(level="country")  # Reset one level
table.reset_index(inplace=True)  # Modify in place
Source code in lib/catalog/owid/catalog/core/tables.py
def reset_index(self, level: Any = None, *, inplace: bool = False, **kwargs: Any) -> Table | None:  # ty: ignore
    """Reset the index to default integer index.

    Extends `pandas.reset_index` with proper type signature for Table.
    Converts index levels to regular columns.

    Args:
        level: Index level(s) to reset. If None, resets all levels.
        inplace: If True, modify the table in place. Default is False.
        **kwargs: Additional arguments passed to pandas.DataFrame.reset_index.

    Returns:
        Table with reset index if inplace=False, None if inplace=True.

    Example:
        ```python
        new_table = table.reset_index()  # Reset all index levels
        new_table = table.reset_index(level="country")  # Reset one level
        table.reset_index(inplace=True)  # Modify in place
        ```
    """
    t = super().reset_index(level=level, inplace=inplace, **kwargs)  # ty: ignore

    if inplace:
        # TODO: make this work for reset_index with subset of levels
        # drop dimensions
        self.metadata.dimensions = None
        return None
    else:
        # preserve metadata in _fields, calling reset_index() on a table drops it
        t._fields = self._fields  # ty: ignore
        # drop dimensions
        t.metadata.dimensions = None
        return t  # ty: ignore
rolling
rolling(*args: Any, **kwargs: Any) -> TableRolling

Rolling operation that preserves metadata.

Source code in lib/catalog/owid/catalog/core/tables.py
def rolling(self, *args: Any, **kwargs: Any) -> TableRolling:
    """Rolling operation that preserves metadata."""
    return TableRolling(super().rolling(*args, **kwargs), self.metadata, self._fields)  # ty: ignore
set_index
set_index(
    keys: str | list[str],
    *,
    inplace: Literal[True],
    **kwargs: Any,
) -> None
set_index(
    keys: str | list[str],
    *,
    inplace: Literal[False],
    **kwargs: Any,
) -> Table
set_index(keys: str | list[str], **kwargs: Any) -> Table
set_index(
    keys: str | list[str], **kwargs: Any
) -> Table | None

Set the DataFrame index using specified columns.

Extends pandas set_index to update table metadata with primary key and dimension information. The index columns become the table's identifying dimensions.

Parameters:

  • keys (str | list[str]) –

    Column name or list of column names to set as index.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pandas.DataFrame.set_index.

Returns:

  • Table | None

    Table with new index if inplace=False, None if inplace=True.

Example
table = table.set_index("country")
table = table.set_index(["country", "year"])
table.set_index("country", inplace=True)
Source code in lib/catalog/owid/catalog/core/tables.py
def set_index(
    self,
    keys: str | list[str],
    **kwargs: Any,
) -> Table | None:
    """Set the DataFrame index using specified columns.

    Extends pandas set_index to update table metadata with primary key
    and dimension information. The index columns become the table's
    identifying dimensions.

    Args:
        keys: Column name or list of column names to set as index.
        **kwargs: Additional arguments passed to pandas.DataFrame.set_index.

    Returns:
        Table with new index if inplace=False, None if inplace=True.

    Example:
        ```python
        table = table.set_index("country")
        table = table.set_index(["country", "year"])
        table.set_index("country", inplace=True)
        ```
    """
    if isinstance(keys, str):
        keys = [keys]

    # create metadata dimensions
    for col in keys:
        # TODO: make this work with append=True
        dimensions = [{"name": self[col].title or key, "slug": key} for key in keys]

    if kwargs.get("inplace"):
        super().set_index(keys, **kwargs)
        t = self
        to_return = None
    else:
        t = super().set_index(keys, **kwargs)
        to_return = cast(Table, t)

    t.metadata.primary_key = keys
    t.metadata.dimensions = dimensions  # ty: ignore
    return to_return
to
to(path: str | Path, repack: bool = True) -> None

Save this table to disk in a supported format.

The format is automatically detected from the file extension (.csv, .feather, or .parquet).

Parameters:

  • path (str | Path) –

    Output file path. Extension determines format.

  • repack (bool, default: True ) –

    If True, optimize column dtypes to reduce file size. Set to False for very large tables if optimization fails.

Example
table.to("data.feather")  # Save as Feather with optimization
table.to("data.csv")  # Save as CSV
table.to("data.parquet", repack=False)  # Skip optimization
Source code in lib/catalog/owid/catalog/core/tables.py
def to(self, path: str | Path, repack: bool = True) -> None:
    """Save this table to disk in a supported format.

    The format is automatically detected from the file extension
    (.csv, .feather, or .parquet).

    Args:
        path: Output file path. Extension determines format.
        repack: If True, optimize column dtypes to reduce file size.
            Set to False for very large tables if optimization fails.

    Example:
        ```python
        table.to("data.feather")  # Save as Feather with optimization
        table.to("data.csv")  # Save as CSV
        table.to("data.parquet", repack=False)  # Skip optimization
        ```
    """
    if isinstance(path, Path):
        path = path.as_posix()

    if path.endswith(".csv"):
        # ignore repacking
        return self.to_csv(path)

    elif path.endswith(".feather"):
        return self.to_feather(path, repack=repack)

    elif path.endswith(".parquet"):
        return self.to_parquet(path, repack=repack)

    elif path.endswith(".json"):
        # ignore repacking
        return self.to_json(path)

    else:
        raise ValueError(f"could not detect a suitable format to save to: {path}")
to_csv
to_csv(path: None = None, **kwargs: Any) -> str
to_csv(path: Any, **kwargs: Any) -> None
to_csv(
    path: Any | None = None, **kwargs: Any
) -> None | str

Save table as CSV with accompanying metadata file.

Saves both the data as CSV and metadata as a separate JSON file. For example, "mytable.csv" will have metadata at "mytable.meta.json".

Parameters:

  • path (Any | None, default: None ) –

    Output CSV path. If None, returns CSV as string.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pandas.DataFrame.to_csv. By default, includes index only if table has a primary key.

Returns:

  • None | str

    CSV string if path is None, otherwise None.

Example
table.to_csv("data.csv")  # Saves data.csv and data.meta.json
csv_str = table.to_csv()  # Returns CSV as string
Source code in lib/catalog/owid/catalog/core/tables.py
def to_csv(self, path: Any | None = None, **kwargs: Any) -> None | str:
    """Save table as CSV with accompanying metadata file.

    Saves both the data as CSV and metadata as a separate JSON file.
    For example, "mytable.csv" will have metadata at "mytable.meta.json".

    Args:
        path: Output CSV path. If None, returns CSV as string.
        **kwargs: Additional arguments passed to pandas.DataFrame.to_csv.
            By default, includes index only if table has a primary key.

    Returns:
        CSV string if path is None, otherwise None.

    Example:
        ```python
        table.to_csv("data.csv")  # Saves data.csv and data.meta.json
        csv_str = table.to_csv()  # Returns CSV as string
        ```
    """
    # return string
    if path is None:
        return super().to_csv(**kwargs)

    if not str(path).endswith(".csv"):
        raise ValueError(f'filename must end in ".csv": {path}')

    df = pd.DataFrame(self)
    if "index" not in kwargs:
        # if the dataframe uses the default index then we don't want to store it (would be a column of row numbers)
        # NOTE: By default pandas does store the index, and users often explicitly add "index=False".
        kwargs["index"] = self.primary_key != []
    df.to_csv(path, **kwargs)

    metadata_filename = splitext(path)[0] + ".meta.json"
    self._save_metadata(metadata_filename)
to_excel
to_excel(
    excel_writer: Any,
    with_metadata: bool = True,
    sheet_name: str = "data",
    metadata_sheet_name: str = "metadata",
    **kwargs: Any,
) -> None

Save table to Excel file with optional metadata codebook.

Exports the table data to an Excel file, optionally including a separate sheet with the codebook metadata.

Parameters:

  • excel_writer (Any) –

    File path or ExcelWriter object to save to.

  • with_metadata (bool, default: True ) –

    If True, include a metadata codebook sheet. Default is True.

  • sheet_name (str, default: 'data' ) –

    Name for the data sheet. Default is "data".

  • metadata_sheet_name (str, default: 'metadata' ) –

    Name for the metadata sheet. Default is "metadata".

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pandas.DataFrame.to_excel.

Example
table.to_excel("output.xlsx")  # With metadata
table.to_excel("output.xlsx", with_metadata=False)  # Data only
Source code in lib/catalog/owid/catalog/core/tables.py
def to_excel(
    self,
    excel_writer: Any,
    with_metadata: bool = True,
    sheet_name: str = "data",
    metadata_sheet_name: str = "metadata",
    **kwargs: Any,
) -> None:
    """Save table to Excel file with optional metadata codebook.

    Exports the table data to an Excel file, optionally including a separate
    sheet with the codebook metadata.

    Args:
        excel_writer: File path or ExcelWriter object to save to.
        with_metadata: If True, include a metadata codebook sheet. Default is True.
        sheet_name: Name for the data sheet. Default is "data".
        metadata_sheet_name: Name for the metadata sheet. Default is "metadata".
        **kwargs: Additional arguments passed to pandas.DataFrame.to_excel.

    Example:
        ```python
        table.to_excel("output.xlsx")  # With metadata
        table.to_excel("output.xlsx", with_metadata=False)  # Data only
        ```
    """
    if isinstance(excel_writer, pd.ExcelWriter):
        # If excel_writer is already an ExcelWriter instance, use it, to avoid nested contexts.
        super().to_excel(excel_writer, sheet_name=sheet_name, **kwargs)
        if with_metadata:
            self.codebook.to_excel(excel_writer, sheet_name=metadata_sheet_name, index=False)
    else:
        # If excel_writer is a file path, create a new ExcelWriter context.
        with pd.ExcelWriter(excel_writer) as writer:  # ty: ignore
            super().to_excel(writer, sheet_name=sheet_name, **kwargs)
            if with_metadata:
                self.codebook.to_excel(writer, sheet_name=metadata_sheet_name, index=False)
to_feather
to_feather(
    path: Any,
    repack: bool = True,
    compression: Literal[
        "zstd", "lz4", "uncompressed"
    ] = "zstd",
    **kwargs: Any,
) -> None

Save table as Feather file with accompanying metadata.

Saves the table in Apache Arrow Feather format with a separate JSON metadata file. For example, "mytable.feather" will have metadata at "mytable.meta.json".

Note

Feather format cannot store indexes, so the index is reset before saving and restored when reading.

Parameters:

  • path (Any) –

    Output file path (must end with .feather).

  • repack (bool, default: True ) –

    If True, optimize column dtypes to reduce file size. Set to False for very large tables if repacking is slow.

  • compression (Literal['zstd', 'lz4', 'uncompressed'], default: 'zstd' ) –

    Compression algorithm to use. Options are: - "zstd" (default): High compression ratio - "lz4": Faster compression - "uncompressed": No compression

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pandas.DataFrame.to_feather.

Raises:

  • ValueError

    If path doesn't end with .feather or if index names overlap with column names.

Example
table.to_feather("data.feather")  # With compression
table.to_feather("data.feather", repack=False)  # Skip optimization
table.to_feather("data.feather", compression="lz4")  # Fast compression
Source code in lib/catalog/owid/catalog/core/tables.py
def to_feather(
    self,
    path: Any,
    repack: bool = True,
    compression: Literal["zstd", "lz4", "uncompressed"] = "zstd",
    **kwargs: Any,
) -> None:
    """Save table as Feather file with accompanying metadata.

    Saves the table in Apache Arrow Feather format with a separate JSON
    metadata file. For example, "mytable.feather" will have metadata at
    "mytable.meta.json".

    Note:
        Feather format cannot store indexes, so the index is reset before
        saving and restored when reading.

    Args:
        path: Output file path (must end with .feather).
        repack: If True, optimize column dtypes to reduce file size.
            Set to False for very large tables if repacking is slow.
        compression: Compression algorithm to use. Options are:
            - "zstd" (default): High compression ratio
            - "lz4": Faster compression
            - "uncompressed": No compression
        **kwargs: Additional arguments passed to pandas.DataFrame.to_feather.

    Raises:
        ValueError: If path doesn't end with .feather or if index names
            overlap with column names.

    Example:
        ```python
        table.to_feather("data.feather")  # With compression
        table.to_feather("data.feather", repack=False)  # Skip optimization
        table.to_feather("data.feather", compression="lz4")  # Fast compression
        ```
    """
    if not str(path).endswith(".feather"):
        raise ValueError(f'filename must end in ".feather": {path}')

    # feather can't store the index
    df = pd.DataFrame(self)
    if self.primary_key:
        overlapping_names = set(self.index.names) & set(self.columns)
        if overlapping_names:
            raise ValueError(f"index names are overlapping with column names: {overlapping_names}")
        df = df.reset_index()

    if repack:
        # use smaller data types wherever possible
        # NOTE: this can be slow for large dataframes
        t = time.time()
        df = repack_frame(df)
        if time.time() - t > 5:
            log.warning(
                "repacking took a long time, consider adding create_dataset(..., repack=False)",
                path=path,
                time=time.time() - t,
            )

    df.to_feather(path, compression=compression, **kwargs)

    self._save_metadata(self.metadata_filename(path))
to_json
to_json(path: None = None, **kwargs: Any) -> str
to_json(path: Any, **kwargs: Any) -> None
to_json(
    path: Any | None = None, **kwargs: Any
) -> None | str

Save this table as a JSON file plus accompanying JSON metadata file. If the table is stored at "mytable.json", the metadata will be at "mytable.meta.json".

By default, uses orient="records" which outputs a simple array of objects without schema information. The index is reset and included as regular columns.

Source code in lib/catalog/owid/catalog/core/tables.py
def to_json(self, path: Any | None = None, **kwargs: Any) -> None | str:
    """
    Save this table as a JSON file plus accompanying JSON metadata file.
    If the table is stored at "mytable.json", the metadata will be at
    "mytable.meta.json".

    By default, uses orient="records" which outputs a simple array of objects
    without schema information. The index is reset and included as regular columns.
    """
    # return string
    if path is None:
        return super().to_json(**kwargs)

    if not str(path).endswith(".json"):
        raise ValueError(f'filename must end in ".json": {path}')

    # Reset index to include index columns as regular data columns
    # Use drop=True if there's no meaningful index (empty primary_key) to avoid
    # creating an unwanted "index" column from a RangeIndex
    df = pd.DataFrame(self.reset_index(drop=not self.primary_key))

    # Set default orient if not specified
    if "orient" not in kwargs:
        # Use 'records' format by default - simple array of objects, no schema
        kwargs["orient"] = "records"

    df.to_json(path, **kwargs)

    metadata_filename = splitext(path)[0] + ".meta.json"
    self._save_metadata(metadata_filename)
to_parquet
to_parquet(path: Any, repack: bool = True) -> None

Save table as Parquet file with metadata sidecar.

Saves the table in Apache Parquet format with a separate JSON metadata file. Parquet provides efficient columnar storage and compression.

Note

Metadata is stored in a separate .meta.json file rather than embedded in the Parquet schema to enable efficient partial reading of large files.

Parameters:

  • path (Any) –

    Output file path (must end with .parquet).

  • repack (bool, default: True ) –

    If True, optimize column dtypes to reduce file size. Set to False for very large tables if repacking is slow.

Raises:

  • ValueError

    If path doesn't end with .parquet.

Example
table.to_parquet("data.parquet")  # With optimization
table.to_parquet("data.parquet", repack=False)  # Skip optimization
Source code in lib/catalog/owid/catalog/core/tables.py
def to_parquet(self, path: Any, repack: bool = True) -> None:  # ty: ignore
    """Save table as Parquet file with metadata sidecar.

    Saves the table in Apache Parquet format with a separate JSON metadata file.
    Parquet provides efficient columnar storage and compression.

    Note:
        Metadata is stored in a separate .meta.json file rather than embedded
        in the Parquet schema to enable efficient partial reading of large files.

    Args:
        path: Output file path (must end with .parquet).
        repack: If True, optimize column dtypes to reduce file size.
            Set to False for very large tables if repacking is slow.

    Raises:
        ValueError: If path doesn't end with .parquet.

    Example:
        ```python
        table.to_parquet("data.parquet")  # With optimization
        table.to_parquet("data.parquet", repack=False)  # Skip optimization
        ```
    """
    if not str(path).endswith(".parquet"):
        raise ValueError(f'filename must end in ".parquet": {path}')

    # parquet can store the index, but repacking is wasted on index columns so
    # we get rid of the index first
    df = pd.DataFrame(self)
    if self.primary_key:
        df = df.reset_index()

    if repack:
        # use smaller data types wherever possible
        # NOTE: this can be slow for large dataframes
        df = repack_frame(df)

    # create a pyarrow table with metadata in the schema
    # (some metadata gets auto-generated to help pandas deserialise better, we want to keep that)
    t = pyarrow.Table.from_pandas(df)

    # adding metadata would make reading partial content inefficient, see https://github.com/owid/etl/issues/783
    # new_metadata = {
    #     b"owid_table": json.dumps(self.metadata.to_dict(), default=str),  # ty: ignore
    #     b"owid_fields": json.dumps(self._get_fields_as_dict(), default=str),
    #     b"primary_key": json.dumps(self.primary_key),
    #     **t.schema.metadata,
    # }
    # schema = t.schema.with_metadata(new_metadata)
    # t = t.cast(schema)

    # write the combined table to disk
    pq.write_table(t, path)

    self._save_metadata(self.metadata_filename(path))
underscore
underscore(
    collision: Literal[
        "raise", "rename", "ignore"
    ] = "raise",
    inplace: bool = False,
    camel_to_snake: bool = False,
) -> Table

Convert column and index names to underscore format.

Converts all column names and index names to snake_case format. In rare cases where two columns map to the same underscored name, the collision parameter controls the behavior.

Parameters:

  • collision (Literal['raise', 'rename', 'ignore'], default: 'raise' ) –

    How to handle naming collisions: - "raise" (default): Raise ValueError if collision occurs - "rename": Append numbered suffix to duplicates - "ignore": Keep first occurrence

  • inplace (bool, default: False ) –

    If True, modify the table in place. Default is False.

  • camel_to_snake (bool, default: False ) –

    If True, convert camelCase to snake_case. Default is False (only converts spaces and special chars).

Returns:

  • Table

    Table with underscored names (or None if inplace=True).

Example

Basic underscoring

table = table.underscore()

Convert camelCase

table = table.underscore(camel_to_snake=True)

Handle collisions

table = table.underscore(collision="rename")

Modify in place

table.underscore(inplace=True)

Source code in lib/catalog/owid/catalog/core/tables.py
def underscore(
    self,
    collision: Literal["raise", "rename", "ignore"] = "raise",
    inplace: bool = False,
    camel_to_snake: bool = False,
) -> Table:
    """Convert column and index names to underscore format.

    Converts all column names and index names to snake_case format.
    In rare cases where two columns map to the same underscored name,
    the collision parameter controls the behavior.

    Args:
        collision: How to handle naming collisions:
            - "raise" (default): Raise ValueError if collision occurs
            - "rename": Append numbered suffix to duplicates
            - "ignore": Keep first occurrence
        inplace: If True, modify the table in place. Default is False.
        camel_to_snake: If True, convert camelCase to snake_case.
            Default is False (only converts spaces and special chars).

    Returns:
        Table with underscored names (or None if inplace=True).

    Example:
        Basic underscoring
        ```python
        table = table.underscore()
        ```

        Convert camelCase
        ```python
        table = table.underscore(camel_to_snake=True)
        ```

        Handle collisions
        ```python
        table = table.underscore(collision="rename")
        ```

        Modify in place
        ```python
        table.underscore(inplace=True)
        ```
    """
    t = self
    orig_cols = t.columns

    # underscore columns and resolve collisions
    new_cols = pd.Index([utils.underscore(c, camel_to_snake=camel_to_snake) for c in t.columns])
    new_cols = _resolve_collisions(orig_cols, new_cols, collision)

    columns_map = {c_old: c_new for c_old, c_new in zip(orig_cols, new_cols)}
    if inplace:
        t.rename(columns=columns_map, inplace=True)
    else:
        t = t.rename(columns=columns_map)

    t.index.names = [utils.underscore(e, camel_to_snake=camel_to_snake) for e in t.index.names]
    t.metadata.primary_key = t.primary_key
    t.metadata.short_name = utils.underscore(t.metadata.short_name, camel_to_snake=camel_to_snake)

    # put original names as titles into metadata by default
    for c_old, c_new in columns_map.items():
        # if underscoring didn't change anything, don't add title
        if t[c_new].metadata.title is None and c_old != c_new:
            t[c_new].metadata.title = c_old

    return t
update_metadata
update_metadata(**kwargs: Any) -> Table

Update table-level metadata fields.

Convenience method to update multiple metadata fields at once.

Parameters:

  • **kwargs (Any, default: {} ) –

    Metadata field names and values to update. Must be valid TableMeta attributes.

Returns:

  • Table

    Self, for method chaining.

Raises:

  • AssertionError

    If any field name is not a valid TableMeta attribute.

Example
table.update_metadata(title="GDP Data", description="GDP by country")
table.update_metadata(short_name="gdp_data")
Source code in lib/catalog/owid/catalog/core/tables.py
def update_metadata(self, **kwargs: Any) -> Table:
    """Update table-level metadata fields.

    Convenience method to update multiple metadata fields at once.

    Args:
        **kwargs: Metadata field names and values to update.
            Must be valid TableMeta attributes.

    Returns:
        Self, for method chaining.

    Raises:
        AssertionError: If any field name is not a valid TableMeta attribute.

    Example:
        ```python
        table.update_metadata(title="GDP Data", description="GDP by country")
        table.update_metadata(short_name="gdp_data")
        ```
    """
    for k, v in kwargs.items():
        assert hasattr(self.metadata, k), f"unknown metadata field {k} in TableMeta"
        setattr(self.metadata, k, v)
    return self
update_metadata_from_yaml
update_metadata_from_yaml(
    path: Path | str,
    table_name: str,
    yaml_params: dict[str, Any] | None = None,
    extra_variables: Literal["raise", "ignore"] = "raise",
    if_origins_exist: SOURCE_EXISTS_OPTIONS = "replace",
) -> None

Update table and variable metadata from a YAML file.

Loads metadata definitions from a .meta.yml file and updates both table-level and variable-level metadata. This is the primary way to add rich metadata in the ETL workflow.

Parameters:

  • path (Path | str) –

    Path to the .meta.yml file with metadata definitions.

  • table_name (str) –

    Name of the table in the YAML file to load metadata from. Also updates the table's short_name to this value.

  • yaml_params (dict[str, Any] | None, default: None ) –

    Additional parameters to pass to the YAML loader.

  • extra_variables (Literal['raise', 'ignore'], default: 'raise' ) –

    How to handle variables in YAML not in table: - "raise" (default): Raise exception - "ignore": Skip extra variables

  • if_origins_exist (SOURCE_EXISTS_OPTIONS, default: 'replace' ) –

    How to handle existing origins: - "replace" (default): Replace existing origin with new one - "append": Append new origin to existing origins - "fail": Raise exception if origin already exists

Example
>>> table.update_metadata_from_yaml("dataset.meta.yml", "population")
>>> table.update_metadata_from_yaml(
...     Path("dataset.meta.yml"),
...     "gdp_data",
...     extra_variables="ignore"
... )
Source code in lib/catalog/owid/catalog/core/tables.py
def update_metadata_from_yaml(
    self,
    path: Path | str,
    table_name: str,
    yaml_params: dict[str, Any] | None = None,
    extra_variables: Literal["raise", "ignore"] = "raise",
    if_origins_exist: SOURCE_EXISTS_OPTIONS = "replace",
) -> None:
    """Update table and variable metadata from a YAML file.

    Loads metadata definitions from a .meta.yml file and updates both
    table-level and variable-level metadata. This is the primary way
    to add rich metadata in the ETL workflow.

    Args:
        path: Path to the .meta.yml file with metadata definitions.
        table_name: Name of the table in the YAML file to load metadata from.
            Also updates the table's short_name to this value.
        yaml_params: Additional parameters to pass to the YAML loader.
        extra_variables: How to handle variables in YAML not in table:
            - "raise" (default): Raise exception
            - "ignore": Skip extra variables
        if_origins_exist: How to handle existing origins:
            - "replace" (default): Replace existing origin with new one
            - "append": Append new origin to existing origins
            - "fail": Raise exception if origin already exists

    Example:
        ```python
        >>> table.update_metadata_from_yaml("dataset.meta.yml", "population")
        >>> table.update_metadata_from_yaml(
        ...     Path("dataset.meta.yml"),
        ...     "gdp_data",
        ...     extra_variables="ignore"
        ... )
        ```
    """
    from owid.catalog.core.yaml_metadata import update_metadata_from_yaml

    return update_metadata_from_yaml(
        tb=self,
        path=path,
        table_name=table_name,
        extra_variables=extra_variables,
        yaml_params=yaml_params,
        if_origins_exist=if_origins_exist,
    )

VariableGroupBy

VariableGroupBy(
    groupby: SeriesGroupBy,
    name: str,
    metadata: VariableMeta,
    table_metadata: TableMeta,
)

Methods:

  • rolling

    Apply rolling window function and return a new VariableGroupBy with proper metadata.

Source code in lib/catalog/owid/catalog/core/tables.py
def __init__(
    self, groupby: pd.core.groupby.SeriesGroupBy, name: str, metadata: VariableMeta, table_metadata: TableMeta
):
    self.groupby = groupby
    self.metadata = metadata
    self.name = name
    self.table_metadata = table_metadata
rolling
rolling(*args: Any, **kwargs: Any) -> VariableGroupBy

Apply rolling window function and return a new VariableGroupBy with proper metadata.

Source code in lib/catalog/owid/catalog/core/tables.py
def rolling(self, *args: Any, **kwargs: Any) -> VariableGroupBy:
    """Apply rolling window function and return a new VariableGroupBy with proper metadata."""
    rolling_groupby = self.groupby.rolling(*args, **kwargs)
    return VariableGroupBy(rolling_groupby, self.name, self.metadata, self.table_metadata)

align_categoricals

align_categoricals(
    left: SeriesOrVariable, right: SeriesOrVariable
) -> tuple[SeriesOrVariable, SeriesOrVariable]

Align categorical columns if possible. If not, return originals. This is necessary for efficient merging.

Source code in lib/catalog/owid/catalog/core/tables.py
def align_categoricals(left: SeriesOrVariable, right: SeriesOrVariable) -> tuple[SeriesOrVariable, SeriesOrVariable]:
    """Align categorical columns if possible. If not, return originals. This is necessary for
    efficient merging."""
    if left.dtype.name == "category" and right.dtype.name == "category":
        common_categories = left.cat.categories.union(right.cat.categories)

        if isinstance(left, indicators.Indicator):
            left = left.set_categories(common_categories)
        else:
            left = left.cat.set_categories(common_categories)  # ty: ignore[invalid-assignment]

        if isinstance(right, indicators.Indicator):
            right = right.set_categories(common_categories)
        else:
            right = right.cat.set_categories(common_categories)  # ty: ignore[invalid-assignment]

        return left, right
    else:
        return left, right

copy_metadata

copy_metadata(
    from_table: Table, to_table: Table, deep: bool = False
) -> Table

Copy metadata from a different table to self.

Source code in lib/catalog/owid/catalog/core/tables.py
def copy_metadata(from_table: Table, to_table: Table, deep: bool = False) -> Table:
    """Copy metadata from a different table to self."""
    tab = Table(pd.DataFrame.copy(to_table, deep=deep), metadata=from_table.metadata.copy())

    common_columns = set(to_table.all_columns) & set(from_table.all_columns)

    new_fields = defaultdict(VariableMeta)
    for k in common_columns:
        # copy if we have metadata in the other table
        if k in from_table._fields:
            new_fields[k] = from_table._fields[k].copy()
        # otherwise keep current metadata (if it exists)
        elif k in to_table._fields:
            new_fields[k] = to_table._fields[k]

    tab._fields = new_fields
    return tab

keep_metadata

keep_metadata(
    func: Callable[..., DataFrame | Series],
) -> Callable[..., Table | Indicator]

Decorator that turns a function that works on DataFrame or Series into a function that works on Table or Variable and preserves metadata. If the decorated function renames columns, their metadata won't be copied.

Example
import owid.catalog.processing as pr

@pr.keep_metadata
def my_df_func(df: pd.DataFrame) -> pd.DataFrame:
    return df + 1

tb = my_df_func(tb)


@pr.keep_metadata
def my_series_func(s: pd.Series) -> pd.Series:
    return s + 1

tb.a = my_series_func(tb.a)
Source code in lib/catalog/owid/catalog/core/tables.py
def keep_metadata(func: Callable[..., pd.DataFrame | pd.Series]) -> Callable[..., Table | indicators.Indicator]:
    """Decorator that turns a function that works on DataFrame or Series into a function that works
    on Table or Variable and preserves metadata.  If the decorated function renames columns, their
    metadata won't be copied.

    Example:
        ```python
        import owid.catalog.processing as pr

        @pr.keep_metadata
        def my_df_func(df: pd.DataFrame) -> pd.DataFrame:
            return df + 1

        tb = my_df_func(tb)


        @pr.keep_metadata
        def my_series_func(s: pd.Series) -> pd.Series:
            return s + 1

        tb.a = my_series_func(tb.a)
        ```
    """

    def wrapper(*args: Any, **kwargs: Any) -> Table | indicators.Indicator:
        tb = args[0]
        df = func(*args, **kwargs)
        if isinstance(df, pd.Series):
            return indicators.Indicator(df, name=tb.name, metadata=tb.metadata)
        elif isinstance(df, pd.DataFrame):
            return Table(df).copy_metadata(tb)
        else:
            raise ValueError(f"Unexpected return type: {type(df)}")

    return wrapper

multi_merge

multi_merge(
    tables: list[Table], *args: Any, **kwargs: Any
) -> Table

Merge multiple tables.

This is a helper function when merging more than two tables on common columns.

Parameters:

Returns:

  • combined ( Table ) –

    Merged table.

Source code in lib/catalog/owid/catalog/core/tables.py
def multi_merge(tables: list[Table], *args: Any, **kwargs: Any) -> Table:
    """Merge multiple tables.

    This is a helper function when merging more than two tables on common columns.

    Args:
        tables: Tables to merge.

    Returns:
        combined: Merged table.

    """
    combined = tables[0].copy()
    for table in tables[1:]:
        combined = combined.merge(table, *args, **kwargs)

    return combined

read

read(
    filepath_or_buffer: str | Path | IO[AnyStr],
    *args: Any,
    file_extension: str | None = None,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
    **kwargs: Any,
) -> Table

Read a file based on extension, dispatching to the appropriate reader.

Parameters:

  • filepath_or_buffer (str | Path | IO[AnyStr]) –

    Path to the file or file-like object to read.

  • *args (Any, default: () ) –

    Additional positional arguments passed to the format-specific reader.

  • file_extension (str | None, default: None ) –

    File extension (without dot). If None, inferred from filepath.

  • metadata (TableMeta | None, default: None ) –

    Table metadata.

  • origin (Origin | None, default: None ) –

    Origin of the table data.

  • underscore (bool, default: False ) –

    True to make all column names snake case.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments passed to the format-specific reader.

Returns:

  • Table

    Table with data and metadata.

Note

For reading ZIP files, use Snapshot.extracted() context manager instead. See etl/snapshot.py for the recommended approach to handling archives.

Source code in lib/catalog/owid/catalog/core/tables.py
def read(
    filepath_or_buffer: str | Path | IO[AnyStr],
    *args: Any,
    file_extension: str | None = None,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
    **kwargs: Any,
) -> Table:
    """Read a file based on extension, dispatching to the appropriate reader.

    Args:
        filepath_or_buffer: Path to the file or file-like object to read.
        *args: Additional positional arguments passed to the format-specific reader.
        file_extension: File extension (without dot). If None, inferred from filepath.
        metadata: Table metadata.
        origin: Origin of the table data.
        underscore: True to make all column names snake case.
        **kwargs: Additional keyword arguments passed to the format-specific reader.

    Returns:
        Table with data and metadata.

    Note:
        For reading ZIP files, use Snapshot.extracted() context manager instead.
        See etl/snapshot.py for the recommended approach to handling archives.
    """
    if file_extension is None:
        file_extension = str(filepath_or_buffer).split(".")[-1].lower()

    reader = EXTENSION_TO_READER.get(file_extension)
    if reader is None:
        raise ValueError(f"Unknown extension: {file_extension}")

    return reader(filepath_or_buffer, *args, metadata=metadata, origin=origin, underscore=underscore, **kwargs)

read_custom

read_custom(
    read_function: Callable,
    filepath_or_buffer: str | Path | IO[AnyStr],
    metadata: TableMeta,
    origin: Origin | None = None,
    underscore: bool = False,
    *args: Any,
    **kwargs: Any,
) -> Table

Read data using a custom reader function and return a Table with metadata.

This function allows using any custom data reading function while automatically attaching metadata and origin information to the resulting Table. Useful when standard read functions (read_csv, read_excel, etc.) don't meet specific needs.

Parameters:

  • read_function (Callable) –

    Custom function to read the data. Must accept filepath_or_buffer as first argument and return a DataFrame or Table.

  • filepath_or_buffer (str | Path | IO[AnyStr]) –

    Path to the file or file-like object to read.

  • metadata (TableMeta) –

    Table metadata.

  • origin (Origin | None, default: None ) –

    Origin of the table data.

  • underscore (bool, default: False ) –

    True to make all column names snake case.

  • *args (Any, default: () ) –

    Additional positional arguments to pass to read_function.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to read_function.

Returns:

  • Table ( Table ) –

    Data read by the custom function as a Table with attached metadata and origin.

Source code in lib/catalog/owid/catalog/core/tables.py
def read_custom(
    read_function: Callable,
    filepath_or_buffer: str | Path | IO[AnyStr],
    metadata: TableMeta,
    origin: Origin | None = None,
    underscore: bool = False,
    *args: Any,
    **kwargs: Any,
) -> Table:
    """Read data using a custom reader function and return a Table with metadata.

    This function allows using any custom data reading function while automatically
    attaching metadata and origin information to the resulting Table. Useful when
    standard read functions (read_csv, read_excel, etc.) don't meet specific needs.

    Args:
        read_function: Custom function to read the data. Must accept filepath_or_buffer as first argument and return a DataFrame or Table.
        filepath_or_buffer: Path to the file or file-like object to read.
        metadata: Table metadata.
        origin: Origin of the table data.
        underscore: True to make all column names snake case.
        *args: Additional positional arguments to pass to read_function.
        **kwargs: Additional keyword arguments to pass to read_function.

    Returns:
        Table: Data read by the custom function as a Table with attached metadata and origin.

    """
    table = Table(read_function(filepath_or_buffer, *args, **kwargs), underscore=underscore)
    table = _add_table_and_variables_metadata_to_table(table=table, metadata=metadata, origin=origin)
    return cast(Table, table)

read_df

read_df(
    df: DataFrame,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
) -> Table

Create a Table (with metadata and an origin) from a DataFrame.

Parameters:

  • df (DataFrame) –

    Input DataFrame.

  • metadata (TableMeta | None, default: None ) –

    Table metadata (with a title and description).

  • origin (Origin | None, default: None ) –

    Origin of the table.

  • underscore (bool, default: False ) –

    True to ensure all column names are snake case.

Returns:

  • Table ( Table ) –

    Original data as a Table with metadata and an origin.

Source code in lib/catalog/owid/catalog/core/tables.py
def read_df(
    df: pd.DataFrame,
    metadata: TableMeta | None = None,
    origin: Origin | None = None,
    underscore: bool = False,
) -> Table:
    """Create a Table (with metadata and an origin) from a DataFrame.

    Args:
        df: Input DataFrame.
        metadata: Table metadata (with a title and description).
        origin: Origin of the table.
        underscore: True to ensure all column names are snake case.

    Returns:
        Table: Original data as a Table with metadata and an origin.
    """
    table = Table(df, underscore=underscore)
    table = _add_table_and_variables_metadata_to_table(table=table, metadata=metadata, origin=origin)
    return cast(Table, table)

update_variable_dimensions

update_variable_dimensions(
    variable: Indicator, dimensions_data: dict[str, Any]
) -> None

Update a variable's dimensions metadata.

Parameters:

  • variable (Indicator) –

    The variable to update with dimension information

  • dimensions_data (dict[str, Any]) –

    Dictionary containing dimension information

Source code in lib/catalog/owid/catalog/core/tables.py
def update_variable_dimensions(variable: indicators.Indicator, dimensions_data: dict[str, Any]) -> None:
    """
    Update a variable's dimensions metadata.

    Args:
        variable: The variable to update with dimension information
        dimensions_data: Dictionary containing dimension information
    """
    if dimensions_data:
        variable.m.original_short_name = dimensions_data.get("originalShortName")
        variable.m.original_title = dimensions_data.get("originalName")
        variable.m.dimensions = {f["name"]: f["value"] for f in dimensions_data.get("filters", [])}

pandas Series with metadata.

owid.catalog.core.indicators

Classes:

  • Indicator

    Enhanced pandas Series with indicator-level metadata support.

  • IndicatorRolling

    Wrapper for pandas rolling window operations that preserves Indicator metadata.

Functions:

Indicator

Indicator(
    data: Any = None,
    index: Any = None,
    name: str | None = None,
    _fields: dict[str, VariableMeta] | None = None,
    metadata: VariableMeta | None = None,
    **kwargs: Any,
)

Bases: Series

Enhanced pandas Series with indicator-level metadata support.

Indicator is a pandas Series subclass that stores rich metadata about individual indicators. It serves as the column type in Table objects and automatically propagates metadata through operations.

Note

This class was formerly called Variable. The old name is still available as an alias for backwards compatibility.

Key features:

  • Automatic metadata propagation through arithmetic operations
  • Processing log tracking for data provenance
  • Integration with OWID catalog metadata system
  • Support for rich metadata including sources, origins, licenses

Attributes:

  • _name (str | None) –

    Internal name storage for metadata mapping.

  • _fields (dict[str, VariableMeta]) –

    Dictionary mapping indicator names to their VariableMeta objects.

  • metadata (VariableMeta) –

    Indicator-level metadata accessible via .metadata or .m property.

Example

Create an indicator with metadata:

from owid.catalog import Indicator, VariableMeta

ind = Indicator(
    [1, 2, 3],
    name="gdp",
    metadata=VariableMeta(
        title="GDP",
        unit="trillion USD",
        description="Gross Domestic Product"
    )
)

Access metadata using shortcuts:

print(ind.metadata.title)  # Full property access
print(ind.m.title)         # Shorthand alias
print(ind.title)           # Direct property access

Metadata propagates through operations:

gdp_per_capita = ind / population
# Result combines metadata from both indicators

Initialize an Indicator with data and metadata.

Parameters:

  • data (Any, default: None ) –

    Array-like data for the indicator (list, numpy array, pandas Series, etc.).

  • index (Any, default: None ) –

    Index labels for the data. If None, uses default integer index.

  • name (str | None, default: None ) –

    Name of the indicator. Required if metadata is provided.

  • _fields (dict[str, VariableMeta] | None, default: None ) –

    Internal metadata dictionary. Don't use directly - use metadata parameter instead.

  • metadata (VariableMeta | None, default: None ) –

    VariableMeta object with indicator-level metadata (title, unit, sources, etc.).

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to pandas.Series.__init__.

Raises:

  • AssertionError

    If both metadata and _fields are provided, or if metadata is provided without a name.

Example

Create a simple indicator:

ind = Indicator([1, 2, 3], name="population")

Create with metadata:

meta = VariableMeta(
    title="Population",
    unit="people",
    description="Total population"
)
ind = Indicator([1e6, 2e6, 3e6], name="population", metadata=meta)

Methods:

  • copy_metadata

    Copy metadata from another indicator.

  • rolling

    Create a rolling window operation that preserves metadata.

  • to_frame

    Convert Indicator to a Table (single-column table).

Source code in lib/catalog/owid/catalog/core/indicators.py
def __init__(
    self,
    data: Any = None,
    index: Any = None,
    name: str | None = None,
    _fields: dict[str, VariableMeta] | None = None,
    metadata: VariableMeta | None = None,
    **kwargs: Any,
) -> None:
    """Initialize an Indicator with data and metadata.

    Args:
        data: Array-like data for the indicator (list, numpy array, pandas Series, etc.).
        index: Index labels for the data. If None, uses default integer index.
        name: Name of the indicator. Required if metadata is provided.
        _fields: Internal metadata dictionary. Don't use directly - use `metadata` parameter instead.
        metadata: VariableMeta object with indicator-level metadata (title, unit, sources, etc.).
        **kwargs: Additional arguments passed to `pandas.Series.__init__`.

    Raises:
        AssertionError: If both `metadata` and `_fields` are provided, or if `metadata`
            is provided without a `name`.

    Example:
        Create a simple indicator:

        ```python
        ind = Indicator([1, 2, 3], name="population")
        ```

        Create with metadata:

        ```python
        meta = VariableMeta(
            title="Population",
            unit="people",
            description="Total population"
        )
        ind = Indicator([1e6, 2e6, 3e6], name="population", metadata=meta)
        ```
    """
    if metadata:
        assert not _fields, "cannot pass both metadata and _fields"
        assert name or self.name, "cannot pass metadata without a name"
        _fields = {(name or self.name): metadata}  # ty: ignore

    self._fields = _fields or defaultdict(VariableMeta)

    # silence warning
    if data is None and not kwargs.get("dtype"):
        kwargs["dtype"] = "object"

    # DeprecationWarning: Passing a SingleBlockManager to Indicator is deprecated and will raise in a future version. Use public APIs instead.
    with warnings.ignore_warnings([DeprecationWarning]):
        super().__init__(data=data, index=index, name=name, **kwargs)  # ty: ignore[unknown-argument]
m property

Metadata alias for shorter access.

Provides convenient shorthand access to indicator metadata.

Returns:

Example
# These are equivalent:
ind.metadata.title
ind.m.title
ind.title  # Direct property access
copy_metadata
copy_metadata(
    from_variable: Indicator, inplace: bool = False
) -> Indicator | None

Copy metadata from another indicator.

Parameters:

  • from_variable (Indicator) –

    Source indicator to copy metadata from.

  • inplace (bool, default: False ) –

    If True, modifies the current indicator. If False, returns a new indicator.

Returns:

  • Indicator | None

    New indicator with copied metadata if inplace=False, otherwise None.

Example

Create new indicator with copied metadata

new_ind = ind1.copy_metadata(from_variable=ind2)

Copy metadata in-place

ind1.copy_metadata(from_variable=ind2, inplace=True)

Source code in lib/catalog/owid/catalog/core/indicators.py
def copy_metadata(self, from_variable: Indicator, inplace: bool = False) -> Indicator | None:
    """Copy metadata from another indicator.

    Args:
        from_variable: Source indicator to copy metadata from.
        inplace: If True, modifies the current indicator. If False, returns a new indicator.

    Returns:
        New indicator with copied metadata if `inplace=False`, otherwise None.

    Example:
        Create new indicator with copied metadata
        ```python
        new_ind = ind1.copy_metadata(from_variable=ind2)
        ```

        Copy metadata in-place
        ```python
        ind1.copy_metadata(from_variable=ind2, inplace=True)
        ```
    """
    return copy_metadata(to_variable=self, from_variable=from_variable, inplace=inplace)  # ty: ignore
rolling
rolling(*args: Any, **kwargs: Any) -> IndicatorRolling

Create a rolling window operation that preserves metadata.

This method wraps pandas rolling operations while maintaining the indicator's metadata.

Parameters:

  • *args (Any, default: () ) –

    Arguments passed to pandas.Series.rolling.

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to pandas.Series.rolling.

Returns:

  • IndicatorRolling

    IndicatorRolling object that applies operations while preserving metadata.

Example

Calculate 7-day rolling average

rolling_avg = ind.rolling(window=7).mean()

The result retains the original indicator's metadata

assert rolling_avg.metadata.title == ind.metadata.title

Source code in lib/catalog/owid/catalog/core/indicators.py
def rolling(self, *args: Any, **kwargs: Any) -> IndicatorRolling:
    """Create a rolling window operation that preserves metadata.

    This method wraps pandas rolling operations while maintaining the indicator's metadata.

    Args:
        *args: Arguments passed to `pandas.Series.rolling`.
        **kwargs: Keyword arguments passed to `pandas.Series.rolling`.

    Returns:
        IndicatorRolling object that applies operations while preserving metadata.

    Example:
        Calculate 7-day rolling average
        ```python
        rolling_avg = ind.rolling(window=7).mean()
        ```

        The result retains the original indicator's metadata
        ```python
        assert rolling_avg.metadata.title == ind.metadata.title
        ```
    """
    return IndicatorRolling(super().rolling(*args, **kwargs), self.metadata.copy(), self.name)  # ty: ignore
to_frame
to_frame(name: str | None = None) -> Table

Convert Indicator to a Table (single-column table).

When a new name is given, the indicator's metadata is copied to the renamed column so that origins are not lost.

Source code in lib/catalog/owid/catalog/core/indicators.py
def to_frame(self, name: str | None = None) -> Table:
    """Convert Indicator to a Table (single-column table).

    When a new name is given, the indicator's metadata is copied to the renamed column
    so that origins are not lost.
    """
    # The parent to_frame() already returns a Table via _constructor_expanddim
    # Don't pass name=None explicitly, as that would make pandas use None as column name
    if name is None:
        return super().to_frame()  # ty: ignore[invalid-return-type]
    else:
        tb = super().to_frame(name=name)  # ty: ignore[invalid-return-type]
        if self.name is not None:
            tb[name].metadata = self.metadata.copy()
        return tb  # ty: ignore

IndicatorRolling

IndicatorRolling(
    rolling: Rolling,
    metadata: VariableMeta,
    name: str | None = None,
)

Wrapper for pandas rolling window operations that preserves Indicator metadata.

This class intercepts rolling window operations (mean, sum, std, etc.) and ensures that the resulting Indicator retains the original metadata.

Note

This class was formerly called VariableRolling.

Attributes:

  • rolling

    The underlying pandas Rolling object.

  • metadata

    Indicator metadata to preserve through operations.

  • name

    Indicator name to preserve through operations.

Example

Create a rolling average

rolling_avg = ind.rolling(window=7).mean()

Metadata is preserved

assert rolling_avg.metadata == ind.metadata
assert rolling_avg.name == ind.name

Note

You typically don't instantiate this class directly. Use Indicator.rolling() instead.

Initialize an IndicatorRolling wrapper.

Parameters:

  • rolling (Rolling) –

    The pandas Rolling object to wrap.

  • metadata (VariableMeta) –

    Metadata to preserve through operations.

  • name (str | None, default: None ) –

    Indicator name to preserve through operations.

Source code in lib/catalog/owid/catalog/core/indicators.py
def __init__(self, rolling: pd.core.window.rolling.Rolling, metadata: VariableMeta, name: str | None = None):
    """Initialize an IndicatorRolling wrapper.

    Args:
        rolling: The pandas Rolling object to wrap.
        metadata: Metadata to preserve through operations.
        name: Indicator name to preserve through operations.
    """
    self.rolling = rolling
    self.metadata = metadata
    self.name = name

combine_indicators_metadata

combine_indicators_metadata(
    indicators: list[Any] | None = None,
    operation: OPERATION | None = None,
    name: str = UNNAMED_INDICATOR,
    *,
    variables: list[Any] | None = None,
) -> VariableMeta

Combine metadata from multiple indicators based on an operation.

This function intelligently merges metadata from multiple indicators when they are combined through operations like addition, division, etc. The logic varies by field:

  • If all indicators have identical values for a field, that value is preserved
  • For lists (sources, origins, licenses), all unique values are combined
  • For some operations (e.g., division), only the first indicator's metadata is kept
  • Processing logs are merged and a new entry is added for the operation

Parameters:

  • indicators (list[Any] | None, default: None ) –

    List of indicators (or other objects) to combine metadata from. Non-Indicator objects are automatically filtered out.

  • operation (OPERATION | None, default: None ) –

    Type of operation being performed ("+", "-", "*", "/", etc.). Affects how metadata fields are combined.

  • name (str, default: UNNAMED_INDICATOR ) –

    Name for the resulting indicator. Defaults to UNNAMED_INDICATOR.

  • variables (list[Any] | None, default: None ) –

    Deprecated alias for indicators parameter (for backwards compatibility).

Returns:

  • VariableMeta

    Combined VariableMeta object with merged metadata from all indicators.

Example

Metadata from addition

result_meta = combine_indicators_metadata(
    indicators=[ind1, ind2],
    operation="+",
    name="sum"
)

Metadata from division (keeps first indicator's metadata)

ratio_meta = combine_indicators_metadata(
    indicators=[numerator, denominator],
    operation="/",
    name="ratio"
)

Note

This function is typically called automatically by Indicator arithmetic operations. You rarely need to call it directly.

Source code in lib/catalog/owid/catalog/core/indicators.py
def combine_indicators_metadata(
    indicators: list[Any] | None = None,
    operation: OPERATION | None = None,
    name: str = UNNAMED_INDICATOR,
    *,
    variables: list[Any] | None = None,
) -> VariableMeta:
    """Combine metadata from multiple indicators based on an operation.

    This function intelligently merges metadata from multiple indicators when they are
    combined through operations like addition, division, etc. The logic varies by field:

    - If all indicators have identical values for a field, that value is preserved
    - For lists (sources, origins, licenses), all unique values are combined
    - For some operations (e.g., division), only the first indicator's metadata is kept
    - Processing logs are merged and a new entry is added for the operation

    Args:
        indicators: List of indicators (or other objects) to combine metadata from.
            Non-Indicator objects are automatically filtered out.
        operation: Type of operation being performed ("+", "-", "*", "/", etc.).
            Affects how metadata fields are combined.
        name: Name for the resulting indicator. Defaults to UNNAMED_INDICATOR.
        variables: Deprecated alias for indicators parameter (for backwards compatibility).

    Returns:
        Combined VariableMeta object with merged metadata from all indicators.

    Example:
        Metadata from addition
        ```python
        result_meta = combine_indicators_metadata(
            indicators=[ind1, ind2],
            operation="+",
            name="sum"
        )
        ```

        Metadata from division (keeps first indicator's metadata)
        ```python
        ratio_meta = combine_indicators_metadata(
            indicators=[numerator, denominator],
            operation="/",
            name="ratio"
        )
        ```

    Note:
        This function is typically called automatically by Indicator arithmetic operations.
        You rarely need to call it directly.
    """
    # Support both parameter names for backwards compatibility
    if indicators is None and variables is not None:
        indicators = variables
    elif indicators is None:
        indicators = []

    # Initialise an empty metadata.
    metadata = VariableMeta()

    # Skip other objects passed in indicators that may not contain metadata (e.g. a scalar),
    # and skip unnamed indicators that cannot have metadata
    indicators_only = [v for v in indicators if hasattr(v, "name") and v.name and hasattr(v, "metadata")]

    # Combine each metadata field using the logic of the specified operation.
    metadata.title = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="title", operation=operation
    )
    metadata.description = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="description", operation=operation
    )
    metadata.description_short = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="description_short", operation=operation
    )
    metadata.description_key = get_unique_description_key_points_from_indicators(indicators=indicators_only)
    # TODO: Combine description_processing: If not identical, append one after another.
    metadata.description_from_producer = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="description_from_producer", operation=operation
    )
    metadata.unit = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="unit", operation=operation, warn_if_different=True
    )
    metadata.short_unit = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="short_unit", operation=operation, warn_if_different=True
    )
    metadata.origins = get_unique_origins_from_indicators(indicators=indicators_only)
    metadata.licenses = get_unique_licenses_from_indicators(indicators=indicators_only)
    metadata.display = combine_indicators_display(indicators=indicators_only, operation=operation)
    metadata.presentation = combine_indicators_presentation(indicators=indicators_only, operation=operation)
    metadata.processing_level = combine_indicators_processing_level(indicators=indicators_only)

    metadata.type = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="type", operation=operation, warn_if_different=True
    )
    metadata.sort = combine_indicators_sort(indicators=indicators_only)
    metadata.license = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="license", operation=operation, warn_if_different=True
    )
    metadata.dimensions = _get_metadata_value_from_indicators_if_all_identical(
        indicators=indicators_only, field="dimensions", operation=operation, warn_if_different=True
    )

    return metadata

copy_metadata

copy_metadata(
    from_variable: Indicator,
    to_variable: Indicator,
    inplace: Literal[False] = False,
) -> Indicator
copy_metadata(
    from_variable: Indicator,
    to_variable: Indicator,
    inplace: Literal[True] = True,
) -> None
copy_metadata(
    from_variable: Indicator,
    to_variable: Indicator,
    inplace: bool = False,
) -> Indicator | None

Copy metadata from one indicator to another.

Parameters:

  • from_variable (Indicator) –

    Source indicator to copy metadata from.

  • to_variable (Indicator) –

    Target indicator to copy metadata to.

  • inplace (bool, default: False ) –

    If True, modifies to_variable in place. If False, returns a new indicator.

Returns:

  • Indicator | None

    New indicator with copied metadata if inplace=False, otherwise None.

Example

Create new indicator with copied metadata

new_ind = copy_metadata(from_variable=source, to_variable=target)

Copy metadata in-place

copy_metadata(from_variable=source, to_variable=target, inplace=True)

Source code in lib/catalog/owid/catalog/core/indicators.py
def copy_metadata(from_variable: Indicator, to_variable: Indicator, inplace: bool = False) -> Indicator | None:
    """Copy metadata from one indicator to another.

    Args:
        from_variable: Source indicator to copy metadata from.
        to_variable: Target indicator to copy metadata to.
        inplace: If True, modifies `to_variable` in place. If False, returns a new indicator.

    Returns:
        New indicator with copied metadata if `inplace=False`, otherwise None.

    Example:
        Create new indicator with copied metadata
        ```python
        new_ind = copy_metadata(from_variable=source, to_variable=target)
        ```

        Copy metadata in-place
        ```python
        copy_metadata(from_variable=source, to_variable=target, inplace=True)
        ```
    """
    if inplace:
        to_variable.metadata = from_variable.metadata.copy()
    else:
        new_variable = to_variable.copy()
        new_variable.metadata = from_variable.metadata.copy()
        return new_variable

get_unique_description_key_points_from_indicators

get_unique_description_key_points_from_indicators(
    indicators: list[Indicator],
) -> list[str]

Get unique description key points from a list of indicators.

Collects all unique key points from the description_key field of multiple indicators, preserving order of first occurrence.

Parameters:

  • indicators (list[Indicator]) –

    List of Indicator objects to extract description key points from.

Returns:

  • list[str]

    List of unique description key points in order of first appearance.

Example
key_points = get_unique_description_key_points_from_indicators([ind1, ind2])
for point in key_points:
    print(f"- {point}")
Source code in lib/catalog/owid/catalog/core/indicators.py
def get_unique_description_key_points_from_indicators(indicators: list[Indicator]) -> list[str]:
    """Get unique description key points from a list of indicators.

    Collects all unique key points from the description_key field of multiple indicators,
    preserving order of first occurrence.

    Args:
        indicators: List of Indicator objects to extract description key points from.

    Returns:
        List of unique description key points in order of first appearance.

    Example:
        ```python
        key_points = get_unique_description_key_points_from_indicators([ind1, ind2])
        for point in key_points:
            print(f"- {point}")
        ```
    """
    # Make a list of all description key points of all indicators.
    description_key_points = []
    for indicator in indicators:
        description_key_points += [k for k in indicator.metadata.description_key if k not in description_key_points]
    return description_key_points

get_unique_licenses_from_indicators

get_unique_licenses_from_indicators(
    indicators: list[Indicator],
) -> list[License]

Get unique licenses from a list of indicators.

Collects all unique License objects from the metadata of multiple indicators, preserving order of first occurrence.

Parameters:

  • indicators (list[Indicator]) –

    List of Indicator objects to extract licenses from.

Returns:

  • list[License]

    List of unique License objects in order of first appearance.

Example
licenses = get_unique_licenses_from_indicators([ind1, ind2, ind3])
print(f"Data uses {len(licenses)} different licenses")
Source code in lib/catalog/owid/catalog/core/indicators.py
def get_unique_licenses_from_indicators(indicators: list[Indicator]) -> list[License]:
    """Get unique licenses from a list of indicators.

    Collects all unique License objects from the metadata of multiple indicators,
    preserving order of first occurrence.

    Args:
        indicators: List of Indicator objects to extract licenses from.

    Returns:
        List of unique License objects in order of first appearance.

    Example:
        ```python
        licenses = get_unique_licenses_from_indicators([ind1, ind2, ind3])
        print(f"Data uses {len(licenses)} different licenses")
        ```
    """
    # Make a list of all licenses of all indicators.
    licenses = []
    for indicator in indicators:
        licenses += [license for license in indicator.metadata.licenses if license not in licenses]
    return licenses

get_unique_origins_from_indicators

get_unique_origins_from_indicators(
    indicators: list[Indicator],
) -> list[Origin]

Get unique origins from a list of indicators.

Collects all unique Origin objects from the metadata of multiple indicators, preserving order of first occurrence.

Parameters:

  • indicators (list[Indicator]) –

    List of Indicator objects to extract origins from.

Returns:

  • list[Origin]

    List of unique Origin objects in order of first appearance.

Example
origins = get_unique_origins_from_indicators([ind1, ind2, ind3])
for origin in origins:
    print(f"Producer: {origin.producer}")
Source code in lib/catalog/owid/catalog/core/indicators.py
def get_unique_origins_from_indicators(indicators: list[Indicator]) -> list[Origin]:
    """Get unique origins from a list of indicators.

    Collects all unique Origin objects from the metadata of multiple indicators,
    preserving order of first occurrence.

    Args:
        indicators: List of Indicator objects to extract origins from.

    Returns:
        List of unique Origin objects in order of first appearance.

    Example:
        ```python
        origins = get_unique_origins_from_indicators([ind1, ind2, ind3])
        for origin in origins:
            print(f"Producer: {origin.producer}")
        ```
    """
    # Make a list of all origins of all indicators.
    origins = []
    for indicator in indicators:
        # Get unique array of tuples of origin fields (respecting the order).
        origins += [o for o in indicator.metadata.origins if o not in origins]
    return origins

is_nullable_series

is_nullable_series(s: Any) -> bool

Check if a series has a nullable pandas dtype.

Determines whether a pandas Series uses one of the nullable integer, float, or boolean dtypes (as opposed to traditional numpy dtypes).

Parameters:

  • s (Any) –

    Any object to check. Typically a pandas Series.

Returns:

  • bool

    True if the object has a nullable pandas dtype, False otherwise.

Example
import pandas as pd

# Nullable integer dtype
s1 = pd.Series([1, 2, None], dtype="Int64")
assert is_nullable_series(s1) == True

# Traditional numpy dtype
s2 = pd.Series([1, 2, 3], dtype="int64")
assert is_nullable_series(s2) == False

# Nullable boolean dtype
s3 = pd.Series([True, False, None], dtype="boolean")
assert is_nullable_series(s3) == True
Note

Nullable dtypes (capitalized like Int64) differ from numpy dtypes (int64) in that they can represent missing values using pd.NA instead of np.nan.

Source code in lib/catalog/owid/catalog/core/indicators.py
def is_nullable_series(s: Any) -> bool:
    """Check if a series has a nullable pandas dtype.

    Determines whether a pandas Series uses one of the nullable integer, float, or
    boolean dtypes (as opposed to traditional numpy dtypes).

    Args:
        s: Any object to check. Typically a pandas Series.

    Returns:
        True if the object has a nullable pandas dtype, False otherwise.

    Example:
        ```python
        import pandas as pd

        # Nullable integer dtype
        s1 = pd.Series([1, 2, None], dtype="Int64")
        assert is_nullable_series(s1) == True

        # Traditional numpy dtype
        s2 = pd.Series([1, 2, 3], dtype="int64")
        assert is_nullable_series(s2) == False

        # Nullable boolean dtype
        s3 = pd.Series([True, False, None], dtype="boolean")
        assert is_nullable_series(s3) == True
        ```

    Note:
        Nullable dtypes (capitalized like `Int64`) differ from numpy dtypes (`int64`)
        in that they can represent missing values using `pd.NA` instead of `np.nan`.
    """
    if not hasattr(s, "dtype"):
        return False

    nullable_types = {
        "Int8",
        "Int16",
        "Int32",
        "Int64",
        "UInt8",
        "UInt16",
        "UInt32",
        "UInt64",
        "Float32",
        "Float64",
        "boolean",
    }
    return str(s.dtype) in nullable_types

owid.catalog.core.meta

Classes:

  • DatasetMeta

    The metadata for this entire dataset kept in JSON (e.g. mydataset/index.json).

  • License

    License information for data products.

  • MetaBase

    Base class for all metadata objects in the catalog.

  • Origin

    Comprehensive metadata about the origin of a data product.

  • TableMeta
  • VariableMeta

    Allowed fields for display attribute used for grapher:

Functions:

  • is_year_or_date

    Matches dates in "yyyy-mm-dd" format or years in "yyyy" format.

  • update_variable_metadata

    Post-process variable metadata and fix issues before rendering or exporting to grapher.

DatasetMeta dataclass

DatasetMeta(
    channel: str | None = None,
    namespace: str | None = None,
    short_name: str | None = None,
    title: str | None = None,
    description: str | None = None,
    licenses: list[License] = list(),
    is_public: bool = True,
    additional_info: dict[str, Any] | None = None,
    version: str | None = None,
    update_period_days: int | None = None,
    non_redistributable: bool = False,
    source_checksum: str | None = None,
)

Bases: MetaBase

The metadata for this entire dataset kept in JSON (e.g. mydataset/index.json).

The number of fields is limited, but should handle everything that we get from Snapshot. There is a lot more opportunity to store more metadata at the table and the variable level.

Methods:

  • copy

    Create a copy of the metadata object.

  • from_dict

    Create metadata object from dictionary.

  • load

    Load metadata from a JSON file.

  • save

    Save metadata to a JSON file.

  • to_dict

    Convert metadata object to dictionary.

  • update

    Update metadata fields with new values.

  • update_from_yaml

    The main reason for wanting to do this is to manually override what goes into Grapher before an export.

Attributes:

  • uri (str) –

    Return unique URI for this dataset if

uri property
uri: str

Return unique URI for this dataset if

copy
copy(deep: bool = True) -> Self

Create a copy of the metadata object.

Parameters:

  • deep (bool, default: True ) –

    If True, creates a deep copy (copies nested objects). If False, creates a shallow copy.

Returns:

  • Self

    Copy of the metadata object.

Example
original = DatasetMeta(title="GDP")
copy = original.copy(deep=True)
copy.title = "Population"  # Doesn't affect original
Source code in lib/catalog/owid/catalog/core/meta.py
def copy(self, deep: bool = True) -> Self:
    """Create a copy of the metadata object.

    Args:
        deep: If True, creates a deep copy (copies nested objects).
            If False, creates a shallow copy.

    Returns:
        Copy of the metadata object.

    Example:
        ```python
        original = DatasetMeta(title="GDP")
        copy = original.copy(deep=True)
        copy.title = "Population"  # Doesn't affect original
        ```
    """
    if not deep:
        return dataclasses.replace(self)  # ty: ignore
    else:
        return _deepcopy_dataclass(self)
from_dict classmethod
from_dict(d: dict[str, Any]) -> T

Create metadata object from dictionary.

Parameters:

  • d (dict[str, Any]) –

    Dictionary with metadata fields.

Returns:

  • T

    New metadata object of the appropriate type.

Example
d = {"title": "GDP", "short_name": "gdp"}
meta = DatasetMeta.from_dict(d)
Note

This uses a custom implementation that's significantly faster than the default dataclasses_json method.

Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> T:  # ty: ignore
    """Create metadata object from dictionary.

    Args:
        d: Dictionary with metadata fields.

    Returns:
        New metadata object of the appropriate type.

    Example:
        ```python
        d = {"title": "GDP", "short_name": "gdp"}
        meta = DatasetMeta.from_dict(d)
        ```

    Note:
        This uses a custom implementation that's significantly faster than
        the default dataclasses_json method.
    """
    # NOTE: this is much faster than using dataclasses_json
    return dataclass_from_dict(cls, d)  # ty: ignore
load classmethod
load(filename: str) -> Self

Load metadata from a JSON file.

Parameters:

  • filename (str) –

    Path to the JSON file containing metadata.

Returns:

  • Self

    Metadata object loaded from the file.

Example
meta = DatasetMeta.load("dataset_meta.json")
print(meta.title)
Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def load(cls, filename: str) -> Self:
    """Load metadata from a JSON file.

    Args:
        filename: Path to the JSON file containing metadata.

    Returns:
        Metadata object loaded from the file.

    Example:
        ```python
        meta = DatasetMeta.load("dataset_meta.json")
        print(meta.title)
        ```
    """
    with open(filename) as istream:
        return cls.from_dict(json.load(istream))
save
save(filename: str | Path) -> None

Save metadata to a JSON file.

Parameters:

  • filename (str | Path) –

    Path where the metadata should be saved.

Example
meta = DatasetMeta(title="GDP")
meta.save("dataset_meta.json")
Source code in lib/catalog/owid/catalog/core/meta.py
def save(self, filename: str | Path) -> None:
    """Save metadata to a JSON file.

    Args:
        filename: Path where the metadata should be saved.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.save("dataset_meta.json")
        ```
    """
    filename = Path(filename).as_posix()
    with open(filename, "w") as ostream:
        json.dump(self.to_dict(), ostream, indent=2, default=str)
to_dict
to_dict(encode_json: bool = False) -> dict[str, Any]

Convert metadata object to dictionary.

Parameters:

  • encode_json (bool, default: False ) –

    If True, encodes values for JSON serialization.

Returns:

  • dict[str, Any]

    Dictionary representation of the metadata.

Example
meta = DatasetMeta(title="GDP", short_name="gdp")
d = meta.to_dict()
print(d["title"])  # "GDP"
Source code in lib/catalog/owid/catalog/core/meta.py
def to_dict(self, encode_json: bool = False) -> dict[str, Any]:  # ty: ignore
    """Convert metadata object to dictionary.

    Args:
        encode_json: If True, encodes values for JSON serialization.

    Returns:
        Dictionary representation of the metadata.

    Example:
        ```python
        meta = DatasetMeta(title="GDP", short_name="gdp")
        d = meta.to_dict()
        print(d["title"])  # "GDP"
        ```
    """
    return super().to_dict(encode_json=encode_json)
update
update(**kwargs: dict[str, Any]) -> None

Update metadata fields with new values.

Parameters:

  • **kwargs (dict[str, Any], default: {} ) –

    Field names and their new values. None values are ignored.

Example
meta = DatasetMeta(title="GDP")
meta.update(title="GDP Data", description="Annual GDP figures")
Source code in lib/catalog/owid/catalog/core/meta.py
def update(self, **kwargs: dict[str, Any]) -> None:
    """Update metadata fields with new values.

    Args:
        **kwargs: Field names and their new values. None values are ignored.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.update(title="GDP Data", description="Annual GDP figures")
        ```
    """
    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)
update_from_yaml
update_from_yaml(path: Path | str) -> None

The main reason for wanting to do this is to manually override what goes into Grapher before an export.

Source code in lib/catalog/owid/catalog/core/meta.py
def update_from_yaml(self, path: Path | str) -> None:
    """The main reason for wanting to do this is to manually override what goes into Grapher before an export."""
    from owid.catalog.core import utils

    annot = utils.dynamic_yaml_load(path, self._params_yaml())

    for k, v in annot.get("dataset", {}).items():
        setattr(self, k, v)

License dataclass

License(name: str | None = None, url: str | None = None)

Bases: MetaBase

License information for data products.

Stores licensing details for datasets and variables, including the license name and URL to the full license text.

Attributes:

  • name (str | None) –

    License name (e.g., "CC BY 4.0", "MIT", "Public Domain").

  • url (str | None) –

    URL to the full license text or information page.

Example
from owid.catalog import License

# Creative Commons license
license = License(
    name="CC BY 4.0",
    url="https://creativecommons.org/licenses/by/4.0/"
)

# Check if license is defined
if license:
    print(f"Licensed under: {license.name}")

Methods:

  • copy

    Create a copy of the metadata object.

  • from_dict

    Create metadata object from dictionary.

  • load

    Load metadata from a JSON file.

  • save

    Save metadata to a JSON file.

  • to_dict

    Convert metadata object to dictionary.

  • update

    Update metadata fields with new values.

copy
copy(deep: bool = True) -> Self

Create a copy of the metadata object.

Parameters:

  • deep (bool, default: True ) –

    If True, creates a deep copy (copies nested objects). If False, creates a shallow copy.

Returns:

  • Self

    Copy of the metadata object.

Example
original = DatasetMeta(title="GDP")
copy = original.copy(deep=True)
copy.title = "Population"  # Doesn't affect original
Source code in lib/catalog/owid/catalog/core/meta.py
def copy(self, deep: bool = True) -> Self:
    """Create a copy of the metadata object.

    Args:
        deep: If True, creates a deep copy (copies nested objects).
            If False, creates a shallow copy.

    Returns:
        Copy of the metadata object.

    Example:
        ```python
        original = DatasetMeta(title="GDP")
        copy = original.copy(deep=True)
        copy.title = "Population"  # Doesn't affect original
        ```
    """
    if not deep:
        return dataclasses.replace(self)  # ty: ignore
    else:
        return _deepcopy_dataclass(self)
from_dict classmethod
from_dict(d: dict[str, Any]) -> T

Create metadata object from dictionary.

Parameters:

  • d (dict[str, Any]) –

    Dictionary with metadata fields.

Returns:

  • T

    New metadata object of the appropriate type.

Example
d = {"title": "GDP", "short_name": "gdp"}
meta = DatasetMeta.from_dict(d)
Note

This uses a custom implementation that's significantly faster than the default dataclasses_json method.

Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> T:  # ty: ignore
    """Create metadata object from dictionary.

    Args:
        d: Dictionary with metadata fields.

    Returns:
        New metadata object of the appropriate type.

    Example:
        ```python
        d = {"title": "GDP", "short_name": "gdp"}
        meta = DatasetMeta.from_dict(d)
        ```

    Note:
        This uses a custom implementation that's significantly faster than
        the default dataclasses_json method.
    """
    # NOTE: this is much faster than using dataclasses_json
    return dataclass_from_dict(cls, d)  # ty: ignore
load classmethod
load(filename: str) -> Self

Load metadata from a JSON file.

Parameters:

  • filename (str) –

    Path to the JSON file containing metadata.

Returns:

  • Self

    Metadata object loaded from the file.

Example
meta = DatasetMeta.load("dataset_meta.json")
print(meta.title)
Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def load(cls, filename: str) -> Self:
    """Load metadata from a JSON file.

    Args:
        filename: Path to the JSON file containing metadata.

    Returns:
        Metadata object loaded from the file.

    Example:
        ```python
        meta = DatasetMeta.load("dataset_meta.json")
        print(meta.title)
        ```
    """
    with open(filename) as istream:
        return cls.from_dict(json.load(istream))
save
save(filename: str | Path) -> None

Save metadata to a JSON file.

Parameters:

  • filename (str | Path) –

    Path where the metadata should be saved.

Example
meta = DatasetMeta(title="GDP")
meta.save("dataset_meta.json")
Source code in lib/catalog/owid/catalog/core/meta.py
def save(self, filename: str | Path) -> None:
    """Save metadata to a JSON file.

    Args:
        filename: Path where the metadata should be saved.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.save("dataset_meta.json")
        ```
    """
    filename = Path(filename).as_posix()
    with open(filename, "w") as ostream:
        json.dump(self.to_dict(), ostream, indent=2, default=str)
to_dict
to_dict(encode_json: bool = False) -> dict[str, Any]

Convert metadata object to dictionary.

Parameters:

  • encode_json (bool, default: False ) –

    If True, encodes values for JSON serialization.

Returns:

  • dict[str, Any]

    Dictionary representation of the metadata.

Example
meta = DatasetMeta(title="GDP", short_name="gdp")
d = meta.to_dict()
print(d["title"])  # "GDP"
Source code in lib/catalog/owid/catalog/core/meta.py
def to_dict(self, encode_json: bool = False) -> dict[str, Any]:  # ty: ignore
    """Convert metadata object to dictionary.

    Args:
        encode_json: If True, encodes values for JSON serialization.

    Returns:
        Dictionary representation of the metadata.

    Example:
        ```python
        meta = DatasetMeta(title="GDP", short_name="gdp")
        d = meta.to_dict()
        print(d["title"])  # "GDP"
        ```
    """
    return super().to_dict(encode_json=encode_json)
update
update(**kwargs: dict[str, Any]) -> None

Update metadata fields with new values.

Parameters:

  • **kwargs (dict[str, Any], default: {} ) –

    Field names and their new values. None values are ignored.

Example
meta = DatasetMeta(title="GDP")
meta.update(title="GDP Data", description="Annual GDP figures")
Source code in lib/catalog/owid/catalog/core/meta.py
def update(self, **kwargs: dict[str, Any]) -> None:
    """Update metadata fields with new values.

    Args:
        **kwargs: Field names and their new values. None values are ignored.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.update(title="GDP Data", description="Annual GDP figures")
        ```
    """
    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)

MetaBase

Bases: DataClassJsonMixin

Base class for all metadata objects in the catalog.

Provides common functionality for metadata serialization, hashing, comparison, and persistence. All metadata classes (DatasetMeta, TableMeta, VariableMeta, etc.) inherit from this base class.

Key features:

  • JSON serialization/deserialization
  • Deterministic hashing for deduplication
  • Deep copying support
  • File persistence (save/load)
  • Dictionary conversion
Example
from owid.catalog import DatasetMeta

# Create metadata
meta = DatasetMeta(title="GDP Data", short_name="gdp")

# Save to file
meta.save("metadata.json")

# Load from file
loaded = DatasetMeta.load("metadata.json")

# Convert to dictionary
d = meta.to_dict()

# Create deep copy
copy = meta.copy(deep=True)

Methods:

  • copy

    Create a copy of the metadata object.

  • from_dict

    Create metadata object from dictionary.

  • load

    Load metadata from a JSON file.

  • save

    Save metadata to a JSON file.

  • to_dict

    Convert metadata object to dictionary.

  • update

    Update metadata fields with new values.

copy
copy(deep: bool = True) -> Self

Create a copy of the metadata object.

Parameters:

  • deep (bool, default: True ) –

    If True, creates a deep copy (copies nested objects). If False, creates a shallow copy.

Returns:

  • Self

    Copy of the metadata object.

Example
original = DatasetMeta(title="GDP")
copy = original.copy(deep=True)
copy.title = "Population"  # Doesn't affect original
Source code in lib/catalog/owid/catalog/core/meta.py
def copy(self, deep: bool = True) -> Self:
    """Create a copy of the metadata object.

    Args:
        deep: If True, creates a deep copy (copies nested objects).
            If False, creates a shallow copy.

    Returns:
        Copy of the metadata object.

    Example:
        ```python
        original = DatasetMeta(title="GDP")
        copy = original.copy(deep=True)
        copy.title = "Population"  # Doesn't affect original
        ```
    """
    if not deep:
        return dataclasses.replace(self)  # ty: ignore
    else:
        return _deepcopy_dataclass(self)
from_dict classmethod
from_dict(d: dict[str, Any]) -> T

Create metadata object from dictionary.

Parameters:

  • d (dict[str, Any]) –

    Dictionary with metadata fields.

Returns:

  • T

    New metadata object of the appropriate type.

Example
d = {"title": "GDP", "short_name": "gdp"}
meta = DatasetMeta.from_dict(d)
Note

This uses a custom implementation that's significantly faster than the default dataclasses_json method.

Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> T:  # ty: ignore
    """Create metadata object from dictionary.

    Args:
        d: Dictionary with metadata fields.

    Returns:
        New metadata object of the appropriate type.

    Example:
        ```python
        d = {"title": "GDP", "short_name": "gdp"}
        meta = DatasetMeta.from_dict(d)
        ```

    Note:
        This uses a custom implementation that's significantly faster than
        the default dataclasses_json method.
    """
    # NOTE: this is much faster than using dataclasses_json
    return dataclass_from_dict(cls, d)  # ty: ignore
load classmethod
load(filename: str) -> Self

Load metadata from a JSON file.

Parameters:

  • filename (str) –

    Path to the JSON file containing metadata.

Returns:

  • Self

    Metadata object loaded from the file.

Example
meta = DatasetMeta.load("dataset_meta.json")
print(meta.title)
Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def load(cls, filename: str) -> Self:
    """Load metadata from a JSON file.

    Args:
        filename: Path to the JSON file containing metadata.

    Returns:
        Metadata object loaded from the file.

    Example:
        ```python
        meta = DatasetMeta.load("dataset_meta.json")
        print(meta.title)
        ```
    """
    with open(filename) as istream:
        return cls.from_dict(json.load(istream))
save
save(filename: str | Path) -> None

Save metadata to a JSON file.

Parameters:

  • filename (str | Path) –

    Path where the metadata should be saved.

Example
meta = DatasetMeta(title="GDP")
meta.save("dataset_meta.json")
Source code in lib/catalog/owid/catalog/core/meta.py
def save(self, filename: str | Path) -> None:
    """Save metadata to a JSON file.

    Args:
        filename: Path where the metadata should be saved.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.save("dataset_meta.json")
        ```
    """
    filename = Path(filename).as_posix()
    with open(filename, "w") as ostream:
        json.dump(self.to_dict(), ostream, indent=2, default=str)
to_dict
to_dict(encode_json: bool = False) -> dict[str, Any]

Convert metadata object to dictionary.

Parameters:

  • encode_json (bool, default: False ) –

    If True, encodes values for JSON serialization.

Returns:

  • dict[str, Any]

    Dictionary representation of the metadata.

Example
meta = DatasetMeta(title="GDP", short_name="gdp")
d = meta.to_dict()
print(d["title"])  # "GDP"
Source code in lib/catalog/owid/catalog/core/meta.py
def to_dict(self, encode_json: bool = False) -> dict[str, Any]:  # ty: ignore
    """Convert metadata object to dictionary.

    Args:
        encode_json: If True, encodes values for JSON serialization.

    Returns:
        Dictionary representation of the metadata.

    Example:
        ```python
        meta = DatasetMeta(title="GDP", short_name="gdp")
        d = meta.to_dict()
        print(d["title"])  # "GDP"
        ```
    """
    return super().to_dict(encode_json=encode_json)
update
update(**kwargs: dict[str, Any]) -> None

Update metadata fields with new values.

Parameters:

  • **kwargs (dict[str, Any], default: {} ) –

    Field names and their new values. None values are ignored.

Example
meta = DatasetMeta(title="GDP")
meta.update(title="GDP Data", description="Annual GDP figures")
Source code in lib/catalog/owid/catalog/core/meta.py
def update(self, **kwargs: dict[str, Any]) -> None:
    """Update metadata fields with new values.

    Args:
        **kwargs: Field names and their new values. None values are ignored.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.update(title="GDP Data", description="Annual GDP figures")
        ```
    """
    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)

Origin dataclass

Origin(
    producer: str,
    title: str,
    description: str | None = None,
    title_snapshot: str | None = None,
    description_snapshot: str | None = None,
    citation_full: str | None = None,
    attribution: str | None = None,
    attribution_short: str | None = None,
    version_producer: str | None = None,
    url_main: str | None = None,
    url_download: str | None = None,
    date_accessed: str | None = None,
    date_published: YearDateLatest | None = None,
    license: License | None = None,
)

Bases: MetaBase

Comprehensive metadata about the origin of a data product.

Origin provides detailed provenance information for datasets, including producer details, citations, URLs, publication dates, and licensing. This is the modern replacement for the legacy Source class.

Attributes:

  • producer (str) –

    Name of the institution or author(s) that produced the data (e.g., "World Bank", "United Nations").

  • title (str) –

    Title of the original data product.

  • description (str | None) –

    Description of the data product and its methodology.

  • title_snapshot (str | None) –

    Title of the specific data subset extracted from the product. Only use if different from title.

  • description_snapshot (str | None) –

    Description of the snapshot subset. Use when the snapshot differs from the full data product.

  • citation_full (str | None) –

    Complete citation for the data product in academic format.

  • attribution (str | None) –

    Name to use for attribution (e.g., "V-Dem Institute" instead of individual authors). Defaults to producer if not provided.

  • attribution_short (str | None) –

    Short form of attribution for space-constrained contexts.

  • version_producer (str | None) –

    Version number or identifier from the data producer (e.g., "v12", "2023.1").

  • url_main (str | None) –

    Authoritative URL for the dataset's main page.

  • url_download (str | None) –

    Direct URL to download the dataset.

  • date_accessed (str | None) –

    ISO-format date when the dataset was accessed (YYYY-MM-DD).

  • date_published (YearDateLatest | None) –

    Publication date (YYYY-MM-DD), year (YYYY), or "latest" for continuously updated datasets.

  • license (License | None) –

    License information for the data product.

Example
from owid.catalog import Origin, License

# Comprehensive origin metadata
origin = Origin(
    producer="World Bank",
    title="World Development Indicators",
    description="Annual indicators of development",
    attribution_short="World Bank",
    version_producer="2024",
    url_main="https://datatopics.worldbank.org/world-development-indicators/",
    url_download="https://databank.worldbank.org/data/download/WDI_CSV.zip",
    date_accessed="2024-01-15",
    date_published="2024",
    license=License(
        name="CC BY 4.0",
        url="https://creativecommons.org/licenses/by/4.0/"
    )
)

# Minimal origin (only required fields)
origin_minimal = Origin(
    producer="UN",
    title="Population Data"
)

Raises:

  • ValueError

    If date_published is not a valid year, date, or "latest".

Methods:

  • copy

    Create a copy of the metadata object.

  • from_dict

    Create metadata object from dictionary.

  • load

    Load metadata from a JSON file.

  • save

    Save metadata to a JSON file.

  • to_dict

    Convert metadata object to dictionary.

  • update

    Update metadata fields with new values.

copy
copy(deep: bool = True) -> Self

Create a copy of the metadata object.

Parameters:

  • deep (bool, default: True ) –

    If True, creates a deep copy (copies nested objects). If False, creates a shallow copy.

Returns:

  • Self

    Copy of the metadata object.

Example
original = DatasetMeta(title="GDP")
copy = original.copy(deep=True)
copy.title = "Population"  # Doesn't affect original
Source code in lib/catalog/owid/catalog/core/meta.py
def copy(self, deep: bool = True) -> Self:
    """Create a copy of the metadata object.

    Args:
        deep: If True, creates a deep copy (copies nested objects).
            If False, creates a shallow copy.

    Returns:
        Copy of the metadata object.

    Example:
        ```python
        original = DatasetMeta(title="GDP")
        copy = original.copy(deep=True)
        copy.title = "Population"  # Doesn't affect original
        ```
    """
    if not deep:
        return dataclasses.replace(self)  # ty: ignore
    else:
        return _deepcopy_dataclass(self)
from_dict classmethod
from_dict(d: dict[str, Any]) -> T

Create metadata object from dictionary.

Parameters:

  • d (dict[str, Any]) –

    Dictionary with metadata fields.

Returns:

  • T

    New metadata object of the appropriate type.

Example
d = {"title": "GDP", "short_name": "gdp"}
meta = DatasetMeta.from_dict(d)
Note

This uses a custom implementation that's significantly faster than the default dataclasses_json method.

Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> T:  # ty: ignore
    """Create metadata object from dictionary.

    Args:
        d: Dictionary with metadata fields.

    Returns:
        New metadata object of the appropriate type.

    Example:
        ```python
        d = {"title": "GDP", "short_name": "gdp"}
        meta = DatasetMeta.from_dict(d)
        ```

    Note:
        This uses a custom implementation that's significantly faster than
        the default dataclasses_json method.
    """
    # NOTE: this is much faster than using dataclasses_json
    return dataclass_from_dict(cls, d)  # ty: ignore
load classmethod
load(filename: str) -> Self

Load metadata from a JSON file.

Parameters:

  • filename (str) –

    Path to the JSON file containing metadata.

Returns:

  • Self

    Metadata object loaded from the file.

Example
meta = DatasetMeta.load("dataset_meta.json")
print(meta.title)
Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def load(cls, filename: str) -> Self:
    """Load metadata from a JSON file.

    Args:
        filename: Path to the JSON file containing metadata.

    Returns:
        Metadata object loaded from the file.

    Example:
        ```python
        meta = DatasetMeta.load("dataset_meta.json")
        print(meta.title)
        ```
    """
    with open(filename) as istream:
        return cls.from_dict(json.load(istream))
save
save(filename: str | Path) -> None

Save metadata to a JSON file.

Parameters:

  • filename (str | Path) –

    Path where the metadata should be saved.

Example
meta = DatasetMeta(title="GDP")
meta.save("dataset_meta.json")
Source code in lib/catalog/owid/catalog/core/meta.py
def save(self, filename: str | Path) -> None:
    """Save metadata to a JSON file.

    Args:
        filename: Path where the metadata should be saved.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.save("dataset_meta.json")
        ```
    """
    filename = Path(filename).as_posix()
    with open(filename, "w") as ostream:
        json.dump(self.to_dict(), ostream, indent=2, default=str)
to_dict
to_dict(encode_json: bool = False) -> dict[str, Any]

Convert metadata object to dictionary.

Parameters:

  • encode_json (bool, default: False ) –

    If True, encodes values for JSON serialization.

Returns:

  • dict[str, Any]

    Dictionary representation of the metadata.

Example
meta = DatasetMeta(title="GDP", short_name="gdp")
d = meta.to_dict()
print(d["title"])  # "GDP"
Source code in lib/catalog/owid/catalog/core/meta.py
def to_dict(self, encode_json: bool = False) -> dict[str, Any]:  # ty: ignore
    """Convert metadata object to dictionary.

    Args:
        encode_json: If True, encodes values for JSON serialization.

    Returns:
        Dictionary representation of the metadata.

    Example:
        ```python
        meta = DatasetMeta(title="GDP", short_name="gdp")
        d = meta.to_dict()
        print(d["title"])  # "GDP"
        ```
    """
    return super().to_dict(encode_json=encode_json)
update
update(**kwargs: dict[str, Any]) -> None

Update metadata fields with new values.

Parameters:

  • **kwargs (dict[str, Any], default: {} ) –

    Field names and their new values. None values are ignored.

Example
meta = DatasetMeta(title="GDP")
meta.update(title="GDP Data", description="Annual GDP figures")
Source code in lib/catalog/owid/catalog/core/meta.py
def update(self, **kwargs: dict[str, Any]) -> None:
    """Update metadata fields with new values.

    Args:
        **kwargs: Field names and their new values. None values are ignored.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.update(title="GDP Data", description="Annual GDP figures")
        ```
    """
    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)

TableMeta dataclass

TableMeta(
    short_name: str | None = None,
    title: str | None = None,
    description: str | None = None,
    dataset: DatasetMeta | None = None,
    primary_key: list[str] = list(),
    dimensions: list[TableDimension] | None = None,
)

Bases: MetaBase

Methods:

  • copy

    Create a copy of the metadata object.

  • from_dict

    Create metadata object from dictionary.

  • load

    Load metadata from a JSON file.

  • save

    Save metadata to a JSON file.

  • to_dict

    Convert metadata object to dictionary.

  • update

    Update metadata fields with new values.

Attributes:

  • uri (str) –

    Return unique URI for this table.

uri property
uri: str

Return unique URI for this table.

copy
copy(deep: bool = True) -> Self

Create a copy of the metadata object.

Parameters:

  • deep (bool, default: True ) –

    If True, creates a deep copy (copies nested objects). If False, creates a shallow copy.

Returns:

  • Self

    Copy of the metadata object.

Example
original = DatasetMeta(title="GDP")
copy = original.copy(deep=True)
copy.title = "Population"  # Doesn't affect original
Source code in lib/catalog/owid/catalog/core/meta.py
def copy(self, deep: bool = True) -> Self:
    """Create a copy of the metadata object.

    Args:
        deep: If True, creates a deep copy (copies nested objects).
            If False, creates a shallow copy.

    Returns:
        Copy of the metadata object.

    Example:
        ```python
        original = DatasetMeta(title="GDP")
        copy = original.copy(deep=True)
        copy.title = "Population"  # Doesn't affect original
        ```
    """
    if not deep:
        return dataclasses.replace(self)  # ty: ignore
    else:
        return _deepcopy_dataclass(self)
from_dict classmethod
from_dict(d: dict[str, Any]) -> T

Create metadata object from dictionary.

Parameters:

  • d (dict[str, Any]) –

    Dictionary with metadata fields.

Returns:

  • T

    New metadata object of the appropriate type.

Example
d = {"title": "GDP", "short_name": "gdp"}
meta = DatasetMeta.from_dict(d)
Note

This uses a custom implementation that's significantly faster than the default dataclasses_json method.

Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> T:  # ty: ignore
    """Create metadata object from dictionary.

    Args:
        d: Dictionary with metadata fields.

    Returns:
        New metadata object of the appropriate type.

    Example:
        ```python
        d = {"title": "GDP", "short_name": "gdp"}
        meta = DatasetMeta.from_dict(d)
        ```

    Note:
        This uses a custom implementation that's significantly faster than
        the default dataclasses_json method.
    """
    # NOTE: this is much faster than using dataclasses_json
    return dataclass_from_dict(cls, d)  # ty: ignore
load classmethod
load(filename: str) -> Self

Load metadata from a JSON file.

Parameters:

  • filename (str) –

    Path to the JSON file containing metadata.

Returns:

  • Self

    Metadata object loaded from the file.

Example
meta = DatasetMeta.load("dataset_meta.json")
print(meta.title)
Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def load(cls, filename: str) -> Self:
    """Load metadata from a JSON file.

    Args:
        filename: Path to the JSON file containing metadata.

    Returns:
        Metadata object loaded from the file.

    Example:
        ```python
        meta = DatasetMeta.load("dataset_meta.json")
        print(meta.title)
        ```
    """
    with open(filename) as istream:
        return cls.from_dict(json.load(istream))
save
save(filename: str | Path) -> None

Save metadata to a JSON file.

Parameters:

  • filename (str | Path) –

    Path where the metadata should be saved.

Example
meta = DatasetMeta(title="GDP")
meta.save("dataset_meta.json")
Source code in lib/catalog/owid/catalog/core/meta.py
def save(self, filename: str | Path) -> None:
    """Save metadata to a JSON file.

    Args:
        filename: Path where the metadata should be saved.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.save("dataset_meta.json")
        ```
    """
    filename = Path(filename).as_posix()
    with open(filename, "w") as ostream:
        json.dump(self.to_dict(), ostream, indent=2, default=str)
to_dict
to_dict(encode_json: bool = False) -> dict[str, Any]

Convert metadata object to dictionary.

Parameters:

  • encode_json (bool, default: False ) –

    If True, encodes values for JSON serialization.

Returns:

  • dict[str, Any]

    Dictionary representation of the metadata.

Example
meta = DatasetMeta(title="GDP", short_name="gdp")
d = meta.to_dict()
print(d["title"])  # "GDP"
Source code in lib/catalog/owid/catalog/core/meta.py
def to_dict(self, encode_json: bool = False) -> dict[str, Any]:  # ty: ignore
    """Convert metadata object to dictionary.

    Args:
        encode_json: If True, encodes values for JSON serialization.

    Returns:
        Dictionary representation of the metadata.

    Example:
        ```python
        meta = DatasetMeta(title="GDP", short_name="gdp")
        d = meta.to_dict()
        print(d["title"])  # "GDP"
        ```
    """
    return super().to_dict(encode_json=encode_json)
update
update(**kwargs: dict[str, Any]) -> None

Update metadata fields with new values.

Parameters:

  • **kwargs (dict[str, Any], default: {} ) –

    Field names and their new values. None values are ignored.

Example
meta = DatasetMeta(title="GDP")
meta.update(title="GDP Data", description="Annual GDP figures")
Source code in lib/catalog/owid/catalog/core/meta.py
def update(self, **kwargs: dict[str, Any]) -> None:
    """Update metadata fields with new values.

    Args:
        **kwargs: Field names and their new values. None values are ignored.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.update(title="GDP Data", description="Annual GDP figures")
        ```
    """
    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)

VariableMeta dataclass

VariableMeta(
    title: str | None = None,
    description: str | None = None,
    description_short: str | None = None,
    description_from_producer: str | None = None,
    description_key: list[str] = list(),
    origins: list[Origin] = list(),
    licenses: list[License] = list(),
    unit: str | None = None,
    short_unit: str | None = None,
    display: dict[str, Any] | None = None,
    additional_info: dict[str, Any] | None = None,
    processing_level: PROCESSING_LEVELS | None = None,
    presentation: VariablePresentationMeta | None = None,
    description_processing: str | None = None,
    license: License | None = None,
    type: VARIABLE_TYPE | None = None,
    sort: list[str] = list(),
    dimensions: dict[str, Any] | None = None,
    original_short_name: str | None = None,
    original_title: str | None = None,
)

Bases: MetaBase

Allowed fields for display attribute used for grapher: name zeroDay yearIsDay includeInTable numDecimalPlaces conversionFactor entityAnnotationsMap Fields unit and shortUnit are copied from attributes unit and short_unit on VariableMeta object

NOTE: consider using its own object for display instead of dict and also possibly underscoring fields and converting them back to camelCase before inserting to grapher

Methods:

  • from_dict

    Create metadata object from dictionary.

  • load

    Load metadata from a JSON file.

  • render

    Render Jinja in all fields of VariableMeta. Return a new VariableMeta object.

  • save

    Save metadata to a JSON file.

  • to_dict

    Convert metadata object to dictionary.

  • update

    Update metadata fields with new values.

Attributes:

  • schema_version (int) –

    Schema version is used to easily understand everywhere what metadata standard was used

schema_version property
schema_version: int

Schema version is used to easily understand everywhere what metadata standard was used for authoring this variable metadata. Defaults to 1 for our legacy variables. "Modern" variables that fill in the presentation key and use origins should record 2 here.

from_dict classmethod
from_dict(d: dict[str, Any]) -> T

Create metadata object from dictionary.

Parameters:

  • d (dict[str, Any]) –

    Dictionary with metadata fields.

Returns:

  • T

    New metadata object of the appropriate type.

Example
d = {"title": "GDP", "short_name": "gdp"}
meta = DatasetMeta.from_dict(d)
Note

This uses a custom implementation that's significantly faster than the default dataclasses_json method.

Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def from_dict(cls, d: dict[str, Any]) -> T:  # ty: ignore
    """Create metadata object from dictionary.

    Args:
        d: Dictionary with metadata fields.

    Returns:
        New metadata object of the appropriate type.

    Example:
        ```python
        d = {"title": "GDP", "short_name": "gdp"}
        meta = DatasetMeta.from_dict(d)
        ```

    Note:
        This uses a custom implementation that's significantly faster than
        the default dataclasses_json method.
    """
    # NOTE: this is much faster than using dataclasses_json
    return dataclass_from_dict(cls, d)  # ty: ignore
load classmethod
load(filename: str) -> Self

Load metadata from a JSON file.

Parameters:

  • filename (str) –

    Path to the JSON file containing metadata.

Returns:

  • Self

    Metadata object loaded from the file.

Example
meta = DatasetMeta.load("dataset_meta.json")
print(meta.title)
Source code in lib/catalog/owid/catalog/core/meta.py
@classmethod
def load(cls, filename: str) -> Self:
    """Load metadata from a JSON file.

    Args:
        filename: Path to the JSON file containing metadata.

    Returns:
        Metadata object loaded from the file.

    Example:
        ```python
        meta = DatasetMeta.load("dataset_meta.json")
        print(meta.title)
        ```
    """
    with open(filename) as istream:
        return cls.from_dict(json.load(istream))
render
render(
    dim_dict: dict[str, Any], remove_dods: bool = False
) -> VariableMeta

Render Jinja in all fields of VariableMeta. Return a new VariableMeta object.

:param dim_dict: dictionary of dimensions to render :param remove_dods: remove references to details on demand from a text

Usage

from owid.catalog import Dataset from etl import paths

ds = Dataset(paths.DATA_DIR / "garden/emissions/2025-02-12/ceds_air_pollutants") tb = ds['ceds_air_pollutants'] tb.emissions.m.render({'pollutant': 'CO', 'sector': 'Transport'})

Source code in lib/catalog/owid/catalog/core/meta.py
def render(self, dim_dict: dict[str, Any], remove_dods: bool = False) -> "VariableMeta":
    """Render Jinja in all fields of VariableMeta. Return a new VariableMeta object.

    :param dim_dict: dictionary of dimensions to render
    :param remove_dods: remove references to details on demand from a text

    Usage:
        from owid.catalog import Dataset
        from etl import paths

        ds = Dataset(paths.DATA_DIR / "garden/emissions/2025-02-12/ceds_air_pollutants")
        tb = ds['ceds_air_pollutants']
        tb.emissions.m.render({'pollutant': 'CO', 'sector': 'Transport'})
    """
    meta = jinja._expand_jinja(self.copy(), dim_dict, remove_dods=remove_dods)

    meta = update_variable_metadata(meta)

    return meta
save
save(filename: str | Path) -> None

Save metadata to a JSON file.

Parameters:

  • filename (str | Path) –

    Path where the metadata should be saved.

Example
meta = DatasetMeta(title="GDP")
meta.save("dataset_meta.json")
Source code in lib/catalog/owid/catalog/core/meta.py
def save(self, filename: str | Path) -> None:
    """Save metadata to a JSON file.

    Args:
        filename: Path where the metadata should be saved.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.save("dataset_meta.json")
        ```
    """
    filename = Path(filename).as_posix()
    with open(filename, "w") as ostream:
        json.dump(self.to_dict(), ostream, indent=2, default=str)
to_dict
to_dict(encode_json: bool = False) -> dict[str, Any]

Convert metadata object to dictionary.

Parameters:

  • encode_json (bool, default: False ) –

    If True, encodes values for JSON serialization.

Returns:

  • dict[str, Any]

    Dictionary representation of the metadata.

Example
meta = DatasetMeta(title="GDP", short_name="gdp")
d = meta.to_dict()
print(d["title"])  # "GDP"
Source code in lib/catalog/owid/catalog/core/meta.py
def to_dict(self, encode_json: bool = False) -> dict[str, Any]:  # ty: ignore
    """Convert metadata object to dictionary.

    Args:
        encode_json: If True, encodes values for JSON serialization.

    Returns:
        Dictionary representation of the metadata.

    Example:
        ```python
        meta = DatasetMeta(title="GDP", short_name="gdp")
        d = meta.to_dict()
        print(d["title"])  # "GDP"
        ```
    """
    return super().to_dict(encode_json=encode_json)
update
update(**kwargs: dict[str, Any]) -> None

Update metadata fields with new values.

Parameters:

  • **kwargs (dict[str, Any], default: {} ) –

    Field names and their new values. None values are ignored.

Example
meta = DatasetMeta(title="GDP")
meta.update(title="GDP Data", description="Annual GDP figures")
Source code in lib/catalog/owid/catalog/core/meta.py
def update(self, **kwargs: dict[str, Any]) -> None:
    """Update metadata fields with new values.

    Args:
        **kwargs: Field names and their new values. None values are ignored.

    Example:
        ```python
        meta = DatasetMeta(title="GDP")
        meta.update(title="GDP Data", description="Annual GDP figures")
        ```
    """
    for key, value in kwargs.items():
        if value is not None:
            setattr(self, key, value)

is_year_or_date

is_year_or_date(s: str) -> bool

Matches dates in "yyyy-mm-dd" format or years in "yyyy" format.

Source code in lib/catalog/owid/catalog/core/meta.py
def is_year_or_date(s: str) -> bool:
    """Matches dates in "yyyy-mm-dd" format or years in "yyyy" format."""
    date_pattern = r"^\d{4}-\d{2}-\d{2}$"
    year_pattern = r"^\d{4}$"

    if re.match(date_pattern, s) or re.match(year_pattern, s):
        return True
    else:
        return False

update_variable_metadata

update_variable_metadata(
    meta: VariableMeta,
) -> VariableMeta

Post-process variable metadata and fix issues before rendering or exporting to grapher. Things like converting strings to numbers, removing empty fields, post-processing jinja rendering, etc.

Source code in lib/catalog/owid/catalog/core/meta.py
def update_variable_metadata(meta: VariableMeta) -> VariableMeta:
    """Post-process variable metadata and fix issues before rendering or exporting to grapher.
    Things like converting strings to numbers, removing empty fields, post-processing jinja
    rendering, etc.
    """
    # Grapher uses units from field `display` instead of fields `unit` and `short_unit`
    # before we fix grapher data model, copy them to `display`.
    meta.display = meta.display or {}

    # Copy unit and short_unit to display if they exist
    if meta.short_unit:
        meta.display.setdefault("shortUnit", meta.short_unit)
    if meta.unit:
        meta.display.setdefault("unit", meta.unit)

    # Convert display fields from string to int/None after Jinja expansion.
    # Jinja renders inside string values, so numeric display fields may arrive
    # as strings (e.g. "2") or empty strings (when a Jinja conditional evaluates
    # to nothing).  Empty strings are removed so they don't override other settings.
    if meta.display:
        for key in ("numDecimalPlaces", "numSignificantFigures"):
            val = meta.display.get(key)
            if isinstance(val, str):
                if val.strip():
                    meta.display[key] = int(val)
                else:
                    del meta.display[key]
        # Also clean up roundingMode if it's an empty string
        if isinstance(meta.display.get("roundingMode"), str) and not meta.display["roundingMode"].strip():
            del meta.display["roundingMode"]

    # Prune empty fields from description_key
    if meta.description_key:
        meta.description_key = [x for x in meta.description_key if x.strip()]

    # Convert from string to proper type when it comes from YAML
    grapher_config = getattr(getattr(meta, "presentation", None), "grapher_config", {}) or {}
    color_scale = grapher_config.get("map", {}).get("colorScale", {})

    # Convert strings to lists when needed
    gconf = getattr(meta.presentation, "grapher_config", None)
    if gconf:
        try:
            color_scale = gconf["map"]["colorScale"]
            if isinstance(color_scale["customNumericValues"], str):
                color_scale["customNumericValues"] = parse_numeric_list(color_scale["customNumericValues"])
        except KeyError:
            pass

    # Prune faqs with empty fragment_id
    if meta.presentation and meta.presentation.faqs:
        faqs: list[FaqLink] = []
        for faq in meta.presentation.faqs:
            if not faq.fragment_id.strip():
                continue
            else:
                faqs.append(faq)
        meta.presentation.faqs = faqs

    return meta

owid.catalog.core.utils

Functions:

  • dataclass_from_dict

    Recursively create an instance of a dataclass from a dictionary. We've implemented custom

  • dynamic_yaml_load

    Load YAML file with dynamic parameter substitution.

  • dynamic_yaml_to_dict

    Convert dynamic YAML object to plain dictionary.

  • hash_any

    Return a unique, deterministic hash for an arbitrary object.

  • parse_numeric_list

    Parse a string representation of a numeric list.

  • prune_dict

    Remove private keys and empty values from a dictionary recursively.

  • pruned_json

    Decorator that modifies a class's to_dict method to prune empty values.

  • remove_details_on_demand

    Remove details-on-demand references from markdown text.

  • underscore

    Convert arbitrary string to snake_case format.

  • underscore_table

    Convert column and index names to underscore format.

  • validate_underscore

    Validate that a name follows snake_case convention.

dataclass_from_dict

dataclass_from_dict(
    cls: type[T] | None, d: dict[str, Any]
) -> T

Recursively create an instance of a dataclass from a dictionary. We've implemented custom method because original dataclasses_json.from_dict was too slow (this gives us more than 2x speedup). See https://github.com/owid/etl/pull/3517#issuecomment-2468084380 for more details.

Source code in lib/catalog/owid/catalog/core/utils.py
def dataclass_from_dict(cls: type[T] | None, d: dict[str, Any]) -> T:
    """Recursively create an instance of a dataclass from a dictionary. We've implemented custom
    method because original dataclasses_json.from_dict was too slow (this gives us more than 2x
    speedup). See https://github.com/owid/etl/pull/3517#issuecomment-2468084380 for more details.
    """
    if d is None or not dataclasses.is_dataclass(cls) or not isinstance(d, dict):
        return d  # ty: ignore

    field_types = {f.name: f.type for f in dataclasses.fields(cls)}

    init_args = {}
    for field_name, v in d.items():
        # Skip values in a dictionary that are not in the dataclass
        if field_name not in field_types:
            continue

        # Handle None values right away
        if v is None:
            init_args[field_name] = None
            continue

        field_type = field_types[field_name]
        origin = get_origin(field_type)
        args = get_args(field_type)

        # unwrap  (e.g. License | None -> License)
        if type(None) in args:
            filtered_args = tuple(a for a in args if a is not type(None))
            if len(filtered_args) == 1:
                # Save the original field_type for List[...] | None case
                field_type = filtered_args[0]
                # For List[...] | None case, update the origin and args
                if get_origin(field_type) is list:
                    origin = list
                    args = get_args(field_type)

        if origin is list:
            # Check if we have type arguments (e.g. List[str])
            if args:
                item_type = args[0]
                init_args[field_name] = [dataclass_from_dict(item_type, item) for item in v]
            else:
                # No type arguments, just use the values as-is
                init_args[field_name] = v
        elif origin is dict:
            key_type, value_type = args
            init_args[field_name] = {k: dataclass_from_dict(value_type, item) for k, item in v.items()}
        elif dataclasses.is_dataclass(field_type):
            init_args[field_name] = field_type.from_dict(v)  # ty: ignore
        elif isinstance(field_type, type) and field_type not in (Any,):
            try:
                init_args[field_name] = field_type(v)
            except ValueError as e:
                log.error(
                    "conversion.failed",
                    field_name=field_name,
                    field_type=field_type,
                    path=f"{d.get('channel')}/{d.get('namespace')}/{d.get('version')}/{d.get('short_name')}",
                    error=str(e),
                )
                continue
        else:
            init_args[field_name] = v

    return cls(**init_args)

dynamic_yaml_load

dynamic_yaml_load(
    source: Path | str | TextIO, params: dict = {}
) -> dict

Load YAML file with dynamic parameter substitution.

Loads a YAML file and updates it with provided parameters for dynamic interpolation. Supports loading from file paths, path strings, or file-like objects.

Parameters:

  • source (Path | str | TextIO) –

    File path (Path or str) or file-like object (e.g., StringIO).

  • params (dict, default: {} ) –

    Dictionary of parameters to substitute in the YAML. Defaults to empty dict.

Returns:

  • dict

    Parsed YAML data as dictionary with parameters applied.

Example
from pathlib import Path

# Load from file path
data = dynamic_yaml_load("config.yaml", {"year": 2024})

# Load from StringIO
from io import StringIO
yaml_str = StringIO("title: Dataset {{year}}")
data = dynamic_yaml_load(yaml_str, {"year": 2024})
Source code in lib/catalog/owid/catalog/core/utils.py
def dynamic_yaml_load(source: Path | str | TextIO, params: dict = {}) -> dict:
    """Load YAML file with dynamic parameter substitution.

    Loads a YAML file and updates it with provided parameters for dynamic
    interpolation. Supports loading from file paths, path strings, or file-like objects.

    Args:
        source: File path (Path or str) or file-like object (e.g., StringIO).
        params: Dictionary of parameters to substitute in the YAML. Defaults to empty dict.

    Returns:
        Parsed YAML data as dictionary with parameters applied.

    Example:
        ```python
        from pathlib import Path

        # Load from file path
        data = dynamic_yaml_load("config.yaml", {"year": 2024})

        # Load from StringIO
        from io import StringIO
        yaml_str = StringIO("title: Dataset {{year}}")
        data = dynamic_yaml_load(yaml_str, {"year": 2024})
        ```
    """
    if isinstance(source, (str, Path)):
        with open(source) as istream:
            yd = dynamic_yaml.load(istream)
    else:  # Assume it's a file-like object (StringIO, BytesIO, etc.)
        yd = dynamic_yaml.load(source)

    yd.update(params)

    return yd

dynamic_yaml_to_dict

dynamic_yaml_to_dict(yd: Any) -> dict

Convert dynamic YAML object to plain dictionary.

Dynamic YAML objects can cause issues when unpacking into dataclass constructors. This function converts them to standard Python dictionaries for safe usage.

Parameters:

  • yd (Any) –

    Dynamic YAML object to convert.

Returns:

  • dict

    Plain Python dictionary.

Example

Problem: Dynamic YAML can cause errors

# origin = Origin(**dynamic_yaml_obj)  # May fail

Solution: Convert to dict first

origin = Origin(**dynamic_yaml_to_dict(dynamic_yaml_obj))  # Safe

Note

Always use this conversion before unpacking into dataclass constructors to avoid unexpected behavior with dynamic YAML objects.

Source code in lib/catalog/owid/catalog/core/utils.py
def dynamic_yaml_to_dict(yd: Any) -> dict:
    """Convert dynamic YAML object to plain dictionary.

    Dynamic YAML objects can cause issues when unpacking into dataclass constructors.
    This function converts them to standard Python dictionaries for safe usage.

    Args:
        yd: Dynamic YAML object to convert.

    Returns:
        Plain Python dictionary.

    Example:
        Problem: Dynamic YAML can cause errors
        ```python
        # origin = Origin(**dynamic_yaml_obj)  # May fail
        ```

        Solution: Convert to dict first
        ```python
        origin = Origin(**dynamic_yaml_to_dict(dynamic_yaml_obj))  # Safe
        ```

    Note:
        Always use this conversion before unpacking into dataclass constructors
        to avoid unexpected behavior with dynamic YAML objects.
    """
    return yaml.safe_load(dynamic_yaml.dump(yd))

hash_any

hash_any(x: Any) -> int

Return a unique, deterministic hash for an arbitrary object.

This function is especially useful when working with mutable objects, such as dataclasses that can't be made frozen, but where you still need to use operations like set, dict keys, or deduplication with unique. A standard Python hash() is not suitable in such cases because Python's hash() function for strings is randomized across different interpreter sessions for security reasons (via PYTHONHASHSEED), which can result in non-deterministic hash values.

This function handles common Python data structures, such as dataclasses, lists, dicts, strings, and None, and ensures that the returned hash is always deterministic across different runs. For strings, it uses an MD5 hash truncated to 64 bits to maintain consistent behavior across different runs of the program.

The function is recursive, so it can handle nested objects like lists of dataclasses, dicts with list values, etc.

Parameters:

  • x (Any) –

    The object to be hashed. It can be of any type: dataclass, list, dict, string, or other.

Returns:

  • int ( int ) –

    A deterministic integer hash value for the object.

Special cases:

  • Dataclasses: It recursively hashes each field of the dataclass by generating a tuple of (field_name_hash, field_value_hash) and then hashes that tuple.
  • Lists: It recursively hashes each element in the list, converts the list to a tuple (because tuples are hashable), and then hashes the tuple.
  • Dictionaries: It hashes the keys and values of the dictionary, sorting them by key to ensure consistency, then generates a tuple of (key_hash, value_hash) pairs and hashes that tuple.
  • Strings: Instead of the built-in hash(), it uses the MD5 hash algorithm to generate a consistent 64-bit hash (by truncating the result) that remains the same across interpreter runs.
  • None: Always returns 0 as the hash for None.
  • Other types: Falls back on Python's built-in hash() function for all other types of objects.
Example
>>> @dataclass
>>> class Person:
...    name: str
...    age: int

>>> p1 = Person(name="Alice", age=30)
>>> p2 = Person(name="Alice", age=30)
>>> hash_any(p1) == hash_any(p2)
True
Source code in lib/catalog/owid/catalog/core/utils.py
def hash_any(x: Any) -> int:
    """Return a unique, deterministic hash for an arbitrary object.

    This function is especially useful when working with mutable objects, such as dataclasses that
    can't be made frozen, but where you still need to use operations like `set`, `dict` keys, or
    deduplication with `unique`. A standard Python `hash()` is not suitable in such cases because Python's
    `hash()` function for strings is randomized across different interpreter sessions for security reasons
    (via `PYTHONHASHSEED`), which can result in non-deterministic hash values.

    This function handles common Python data structures, such as dataclasses, lists, dicts, strings, and `None`,
    and ensures that the returned hash is always deterministic across different runs. For strings, it uses an MD5
    hash truncated to 64 bits to maintain consistent behavior across different runs of the program.

    The function is recursive, so it can handle nested objects like lists of dataclasses, dicts with list values, etc.

    Args:
        x (Any): The object to be hashed. It can be of any type: dataclass, list, dict, string, or other.

    Returns:
        int: A deterministic integer hash value for the object.

    Special cases:

    - **Dataclasses**: It recursively hashes each field of the dataclass by generating a tuple of (field_name_hash, field_value_hash)
      and then hashes that tuple.
    - **Lists**: It recursively hashes each element in the list, converts the list to a tuple (because tuples are hashable),
      and then hashes the tuple.
    - **Dictionaries**: It hashes the keys and values of the dictionary, sorting them by key to ensure consistency, then
      generates a tuple of (key_hash, value_hash) pairs and hashes that tuple.
    - **Strings**: Instead of the built-in `hash()`, it uses the MD5 hash algorithm to generate a consistent 64-bit hash
      (by truncating the result) that remains the same across interpreter runs.
    - **None**: Always returns `0` as the hash for `None`.
    - **Other types**: Falls back on Python's built-in `hash()` function for all other types of objects.

    Example:
        ```python
        >>> @dataclass
        >>> class Person:
        ...    name: str
        ...    age: int

        >>> p1 = Person(name="Alice", age=30)
        >>> p2 = Person(name="Alice", age=30)
        >>> hash_any(p1) == hash_any(p2)
        True
        ```
    """

    if is_dataclass(x):
        # Handle dataclass: sort fields by name and hash a tuple of (field_name_hash, field_value_hash) for each field
        return hash(
            tuple([(hash_any(f.name), hash_any(getattr(x, f.name))) for f in sorted(fields(x), key=lambda f: f.name)])
        )
    elif isinstance(x, list):
        # Handle lists: recursively hash each element in the list and hash the result as a tuple
        return hash(tuple([hash_any(y) for y in x]))
    elif isinstance(x, dict):
        # Handle dicts: sort by key, then recursively hash each key-value pair as a tuple of (key_hash, value_hash)
        return hash(tuple([(hash_any(k), hash_any(v)) for k, v in sorted(x.items())]))
    elif isinstance(x, str):
        # Handle strings: compute the MD5 hash, truncate to 64 bits for consistent results across runs
        return int(hashlib.md5(x.encode()).hexdigest(), 16) & ((1 << 64) - 1)
    elif x is None:
        # Handle None: return a fixed hash value for None
        return 0
    else:
        # Fallback for other types: use the built-in hash() function
        return hash(x)

parse_numeric_list

parse_numeric_list(val: list | str) -> list[float | int]

Parse a string representation of a numeric list.

Converts a comma-separated string of numbers (optionally wrapped in brackets) into a Python list of integers and floats.

Parameters:

  • val (list | str) –

    String representation of a numeric list or an existing list. If already a list, returns it unchanged.

Returns:

  • list[float | int]

    List of integers and floats parsed from the input string.

Example
# String with brackets
parse_numeric_list("[10, 20, 30]")
# Returns: [10, 20, 30]

# String without brackets
parse_numeric_list("1.5, 2.5, 3.0")
# Returns: [1.5, 2.5, 3.0]

# Mixed integers and floats
parse_numeric_list("10, 20.5, 30")
# Returns: [10, 20.5, 30]

# Already a list (no-op)
parse_numeric_list([1, 2, 3])
# Returns: [1, 2, 3]
Note

Numbers with decimal points are parsed as floats, others as integers.

Source code in lib/catalog/owid/catalog/core/utils.py
def parse_numeric_list(val: list | str) -> list[float | int]:
    """Parse a string representation of a numeric list.

    Converts a comma-separated string of numbers (optionally wrapped in brackets)
    into a Python list of integers and floats.

    Args:
        val: String representation of a numeric list or an existing list.
            If already a list, returns it unchanged.

    Returns:
        List of integers and floats parsed from the input string.

    Example:
        ```python
        # String with brackets
        parse_numeric_list("[10, 20, 30]")
        # Returns: [10, 20, 30]

        # String without brackets
        parse_numeric_list("1.5, 2.5, 3.0")
        # Returns: [1.5, 2.5, 3.0]

        # Mixed integers and floats
        parse_numeric_list("10, 20.5, 30")
        # Returns: [10, 20.5, 30]

        # Already a list (no-op)
        parse_numeric_list([1, 2, 3])
        # Returns: [1, 2, 3]
        ```

    Note:
        Numbers with decimal points are parsed as floats, others as integers.
    """
    if isinstance(val, list):
        return val
    stripped = val.strip()
    if stripped.startswith("[") and stripped.endswith("]"):
        stripped = stripped[1:-1]

    return [float(x) if "." in x else int(x) for x in stripped.split(",") if x.strip()]

prune_dict

prune_dict(d: dict) -> dict

Remove private keys and empty values from a dictionary recursively.

Removes all keys starting with underscore (private fields) and all empty values (None, empty lists, empty dicts) from a dictionary and its nested structures — except keys listed in KEEP_IF_EMPTY, where an explicit empty value is meaningful and must round-trip through serialization.

Inside lists, only empty dicts and empty lists are filtered — None is preserved so that positional arrays (e.g. customNumericColors, customNumericLabels in grapher_config) keep their alignment, where None means "fall back to default" at that index.

Parameters:

  • d (dict) –

    Dictionary to prune.

Returns:

  • dict

    New dictionary with private keys and empty values removed.

Example
d = {
    "title": "Dataset",
    "_internal": "hidden",
    "count": 0,  # Kept (not empty)
    "empty_list": [],
    "chartTypes": [],     # Kept — in KEEP_IF_EMPTY
    "nested": {"value": 1, "null": None},
    "positional": [None, None, "#bc8e5a"],  # None preserved inside list
}
result = prune_dict(d)
# Returns: {"title": "Dataset", "count": 0, "chartTypes": [],
#          "nested": {"value": 1}, "positional": [None, None, "#bc8e5a"]}
Source code in lib/catalog/owid/catalog/core/utils.py
def prune_dict(d: dict) -> dict:
    """Remove private keys and empty values from a dictionary recursively.

    Removes all keys starting with underscore (private fields) and all empty
    values (None, empty lists, empty dicts) from a dictionary and its nested
    structures — *except* keys listed in ``KEEP_IF_EMPTY``, where an explicit
    empty value is meaningful and must round-trip through serialization.

    Inside lists, only empty dicts and empty lists are filtered — `None` is
    preserved so that positional arrays (e.g. `customNumericColors`,
    `customNumericLabels` in grapher_config) keep their alignment, where `None`
    means "fall back to default" at that index.

    Args:
        d: Dictionary to prune.

    Returns:
        New dictionary with private keys and empty values removed.

    Example:
        ```python
        d = {
            "title": "Dataset",
            "_internal": "hidden",
            "count": 0,  # Kept (not empty)
            "empty_list": [],
            "chartTypes": [],     # Kept — in KEEP_IF_EMPTY
            "nested": {"value": 1, "null": None},
            "positional": [None, None, "#bc8e5a"],  # None preserved inside list
        }
        result = prune_dict(d)
        # Returns: {"title": "Dataset", "count": 0, "chartTypes": [],
        #          "nested": {"value": 1}, "positional": [None, None, "#bc8e5a"]}
        ```
    """
    out = {}
    for k, v in d.items():
        if k.startswith("_"):
            continue
        is_empty = v in [None, [], {}]
        if is_empty and k not in KEEP_IF_EMPTY:
            continue
        if isinstance(v, dict):
            out[k] = prune_dict(v) if not is_empty else v
        elif isinstance(v, list):
            # Preserve None and other primitives in lists; only filter empty
            # dicts/lists to avoid serializing pointless placeholders. None is
            # semantically meaningful at a positional index (fallback marker).
            out[k] = [
                prune_dict(x) if isinstance(x, dict) else x for x in v if not (isinstance(x, (dict, list)) and not x)
            ]
        else:
            out[k] = v
    return out

pruned_json

pruned_json(cls: T) -> T

Decorator that modifies a class's to_dict method to prune empty values.

Wraps a dataclass's to_dict method to automatically remove private fields (starting with underscore) and empty values when serializing to JSON.

Parameters:

  • cls (T) –

    Dataclass to decorate.

Returns:

  • T

    The same class with modified to_dict method.

Example
from dataclasses import dataclass
from owid.catalog.utils import pruned_json

@pruned_json
@dataclass
class Config:
    name: str
    _internal: str = "hidden"
    optional: str | None = None

config = Config(name="test", _internal="secret", optional=None)
d = config.to_dict()
# Returns: {"name": "test"}  (no _internal or optional)
Note

This decorator is commonly used with metadata classes to keep JSON output clean by removing None values and private fields.

Source code in lib/catalog/owid/catalog/core/utils.py
def pruned_json(cls: T) -> T:
    """Decorator that modifies a class's to_dict method to prune empty values.

    Wraps a dataclass's `to_dict` method to automatically remove private fields
    (starting with underscore) and empty values when serializing to JSON.

    Args:
        cls: Dataclass to decorate.

    Returns:
        The same class with modified `to_dict` method.

    Example:
        ```python
        from dataclasses import dataclass
        from owid.catalog.utils import pruned_json

        @pruned_json
        @dataclass
        class Config:
            name: str
            _internal: str = "hidden"
            optional: str | None = None

        config = Config(name="test", _internal="secret", optional=None)
        d = config.to_dict()
        # Returns: {"name": "test"}  (no _internal or optional)
        ```

    Note:
        This decorator is commonly used with metadata classes to keep JSON
        output clean by removing None values and private fields.
    """
    orig = cls.to_dict  # ty: ignore

    # only keep non-null public variables
    # calling original to_dict returns dictionaries, not objects
    cls.to_dict = lambda self, **kwargs: prune_dict(orig(self, **kwargs))  # ty: ignore

    return cls

remove_details_on_demand

remove_details_on_demand(text: str) -> str

Remove details-on-demand references from markdown text.

Strips out special markdown links that reference details-on-demand content, keeping only the link text. This is useful for generating plain text versions of content that contains interactive elements.

Parameters:

  • text (str) –

    Markdown text containing details-on-demand references.

Returns:

  • str

    Text with details-on-demand references removed, keeping only link text.

Example
text = "This is a [description](#dod:something) of the data."
result = remove_details_on_demand(text)
# Returns: "This is a description of the data."

Multiple references

text = "See [mortality](#dod:mort) and [fertility](#dod:fert) data."
result = remove_details_on_demand(text)
# Returns: "See mortality and fertility data."

Note

The regex pattern matches [text](#dod:keyword) and replaces it with just text.

Source code in lib/catalog/owid/catalog/core/utils.py
def remove_details_on_demand(text: str) -> str:
    """Remove details-on-demand references from markdown text.

    Strips out special markdown links that reference details-on-demand content,
    keeping only the link text. This is useful for generating plain text versions
    of content that contains interactive elements.

    Args:
        text: Markdown text containing details-on-demand references.

    Returns:
        Text with details-on-demand references removed, keeping only link text.

    Example:
        ```python
        text = "This is a [description](#dod:something) of the data."
        result = remove_details_on_demand(text)
        # Returns: "This is a description of the data."
        ```

        Multiple references
        ```python
        text = "See [mortality](#dod:mort) and [fertility](#dod:fert) data."
        result = remove_details_on_demand(text)
        # Returns: "See mortality and fertility data."
        ```

    Note:
        The regex pattern matches `[text](#dod:keyword)` and replaces it with just `text`.
    """
    # The regex matches the entire markdown link syntax [text](#dod:keyword) and replaces it with just the text
    regex = r"\[([^\]]+)\]\(#dod:[^\)]+\)"
    text = re.sub(regex, r"\1", text)

    return text

underscore

underscore(
    name: str,
    validate: bool = True,
    camel_to_snake: bool = False,
) -> str
underscore(
    name: None,
    validate: bool = True,
    camel_to_snake: bool = False,
) -> None
underscore(
    name: str | None,
    validate: bool = True,
    camel_to_snake: bool = False,
) -> str | None

Convert arbitrary string to snake_case format.

Transforms strings into valid Python identifiers using snake_case convention. Handles special characters, punctuation, and optionally converts camelCase. Originally fine-tuned for World Bank WDI column names.

Parameters:

  • name (str | None) –

    String to format. Returns None if input is None.

  • validate (bool, default: True ) –

    If True, validates the result is valid snake_case and raises NameError if not. Defaults to True.

  • camel_to_snake (bool, default: False ) –

    If True, converts camelCase to snake_case before other transformations. Defaults to False.

Returns:

  • str | None

    String in snake_case format, or None if input was None.

Raises:

  • NameError

    If validate is True and the result is not valid snake_case.

Example
# Basic usage
underscore("GDP (constant 2015 US$)")
# Returns: "gdp__constant_2015_usdollar__"

# Handle camelCase
underscore("myVariableName", camel_to_snake=True)
# Returns: "my_variable_name"

# Skip validation
underscore("123invalid", validate=False)
# Returns: "_123invalid"
Warning

This function may evolve in the future. For critical use cases, either add tests or manually underscore your column names.

Source code in lib/catalog/owid/catalog/core/utils.py
def underscore(name: str | None, validate: bool = True, camel_to_snake: bool = False) -> str | None:
    """Convert arbitrary string to snake_case format.

    Transforms strings into valid Python identifiers using snake_case convention.
    Handles special characters, punctuation, and optionally converts camelCase.
    Originally fine-tuned for World Bank WDI column names.

    Args:
        name: String to format. Returns None if input is None.
        validate: If True, validates the result is valid snake_case and raises
            NameError if not. Defaults to True.
        camel_to_snake: If True, converts camelCase to snake_case before other
            transformations. Defaults to False.

    Returns:
        String in snake_case format, or None if input was None.

    Raises:
        NameError: If validate is True and the result is not valid snake_case.

    Example:
        ```python
        # Basic usage
        underscore("GDP (constant 2015 US$)")
        # Returns: "gdp__constant_2015_usdollar__"

        # Handle camelCase
        underscore("myVariableName", camel_to_snake=True)
        # Returns: "my_variable_name"

        # Skip validation
        underscore("123invalid", validate=False)
        # Returns: "_123invalid"
        ```

    Warning:
        This function may evolve in the future. For critical use cases, either
        add tests or manually underscore your column names.
    """
    if name is None:
        return None

    orig_name = name

    # camelCase to snake_case
    if camel_to_snake:
        name = _camel_to_snake(name)

    # convert special characters to ASCII first, then work with clean ASCII
    name = unidecode(name).lower()

    # replace basic whitespace and punctuation
    name = (
        name.replace(" ", "_")
        .replace("-", "_")
        .replace(",", "_")
        .replace(".", "_")
        .replace("\t", "_")
        .replace("?", "_")
        .replace("!", "_")
        .replace('"', "")
        .replace("'", "")
        .replace("\xa0", "_")
        .replace("`", "")
        .replace("*", "_")
        .replace("#", "")
        .replace("^", "")
    )

    # replace special separators
    name = (
        name.replace("(", "__")
        .replace(")", "__")
        .replace(":", "__")
        .replace(";", "__")
        .replace("[", "__")
        .replace("]", "__")
    )

    # replace special symbols
    name = name.replace("/", "_")
    name = name.replace("|", "_")
    name = name.replace("=", "_")
    name = name.replace("%", "pct")
    name = name.replace("+", "plus")
    name = name.replace("us$", "usd")
    name = name.replace("$", "dollar")
    name = name.replace("&", "_and_")
    name = name.replace("<", "_lt_")
    name = name.replace(">", "_gt_")

    # shrink triple underscore
    name = re.sub("__+", "__", name)

    # strip leading and trailing underscores
    name = name.strip("_")

    # if the first letter is number, prefix it with underscore
    if re.match("^[0-9]", name):
        name = f"_{name}"

    # make sure it's under_score now, if not then raise NameError
    if validate:
        validate_underscore(name, f"`{orig_name}`")

    return name

underscore_table

underscore_table(t: Any, *args: Any, **kwargs: Any) -> Any

Convert column and index names to underscore format.

Warning

DEPRECATED: Use table.underscore() method instead. This function exists only for backward compatibility.

Parameters:

  • t (Any) –

    Table object to underscore.

  • *args (Any, default: () ) –

    Positional arguments passed to table.underscore().

  • **kwargs (Any, default: {} ) –

    Keyword arguments passed to table.underscore().

Returns:

  • Any

    Table with underscored column and index names.

Example

Deprecated usage

underscored = underscore_table(my_table)

Preferred usage

underscored = my_table.underscore()

Source code in lib/catalog/owid/catalog/core/utils.py
def underscore_table(t: Any, *args: Any, **kwargs: Any) -> Any:
    """Convert column and index names to underscore format.

    Warning:
        **DEPRECATED**: Use `table.underscore()` method instead. This function
        exists only for backward compatibility.

    Args:
        t: Table object to underscore.
        *args: Positional arguments passed to `table.underscore()`.
        **kwargs: Keyword arguments passed to `table.underscore()`.

    Returns:
        Table with underscored column and index names.

    Example:
        Deprecated usage
        ```python
        underscored = underscore_table(my_table)
        ```

        Preferred usage
        ```python
        underscored = my_table.underscore()
        ```
    """
    return t.underscore(*args, **kwargs)

validate_underscore

validate_underscore(
    name: str | None, object_name: str = "Name"
) -> None

Validate that a name follows snake_case convention.

Parameters:

  • name (str | None) –

    String to validate. If None, validation is skipped.

  • object_name (str, default: 'Name' ) –

    Name of the object being validated, used in error messages. Defaults to "Name".

Raises:

  • NameError

    If name is not valid snake_case (lowercase letters, digits, and underscores only, must start with letter or underscore).

Example

Valid names pass silently

validate_underscore("my_variable")
validate_underscore("_private_var")

Invalid names raise NameError

try:
    validate_underscore("MyVariable", "Variable")
except NameError as e:
    print(e)
    # Prints: Variable must be snake_case. Change `MyVariable` to `my_variable`

Source code in lib/catalog/owid/catalog/core/utils.py
def validate_underscore(name: str | None, object_name: str = "Name") -> None:
    """Validate that a name follows snake_case convention.

    Args:
        name: String to validate. If None, validation is skipped.
        object_name: Name of the object being validated, used in error messages.
            Defaults to "Name".

    Raises:
        NameError: If name is not valid snake_case (lowercase letters, digits,
            and underscores only, must start with letter or underscore).

    Example:
        Valid names pass silently
        ```python
        validate_underscore("my_variable")
        validate_underscore("_private_var")
        ```

        Invalid names raise NameError
        ```python
        try:
            validate_underscore("MyVariable", "Variable")
        except NameError as e:
            print(e)
            # Prints: Variable must be snake_case. Change `MyVariable` to `my_variable`
        ```
    """
    if name is not None and not re.match("^[a-z_][a-z0-9_]*$", name):
        raise NameError(f"{object_name} must be snake_case. Change `{name}` to `{underscore(name, validate=False)}`")

owid.catalog.s3_utils

Classes:

Functions:

  • connect_r2

    Create a connection to Cloudflare R2 storage.

  • connect_r2_cached

    Create a cached, thread-safe connection to Cloudflare R2.

  • download

    Download a file from S3 to local filesystem.

  • download_s3_folder

    Download all files from an S3 folder to a local directory.

  • list_s3_objects

    List all objects in an S3 folder.

  • s3_bucket_key

    Extract bucket name and key from an S3 URL.

  • upload

    Upload the file at the given local filename to the S3 URL.

MissingCredentialsError

Bases: Exception

Raised when R2 credentials are not found.

This exception is raised when neither environment variables nor rclone configuration contain the required R2 credentials.

UploadError

Bases: Exception

Raised when S3 upload or download operations fail.

This exception wraps boto3 ClientError exceptions that occur during S3 operations like upload, download, or file listing.

connect_r2

connect_r2() -> BaseClient

Create a connection to Cloudflare R2 storage.

Creates a boto3 S3 client configured for Cloudflare R2. Credentials are loaded from environment variables or rclone configuration file.

Credential sources (in priority order):

  1. Environment variables: R2_ACCESS_KEY, R2_SECRET_KEY, R2_ENDPOINT, R2_REGION_NAME
  2. rclone config file: ~/.config/rclone/rclone.conf (section: owid-r2)

Returns:

  • BaseClient

    Boto3 S3 client configured for R2.

Example
# Connect to R2
client = connect_r2()

# Use with boto3 operations
client.list_objects_v2(Bucket='my-bucket', Prefix='data/')
Note

For cached connections that reuse the same client across calls, use connect_r2_cached() instead. This is more efficient for multiple operations.

See Also
  • connect_r2_cached(): Thread-safe cached version
  • Cloudflare R2 docs: https://developers.cloudflare.com/r2/
Source code in lib/catalog/owid/catalog/s3_utils.py
def connect_r2() -> BaseClient:
    """Create a connection to Cloudflare R2 storage.

    Creates a boto3 S3 client configured for Cloudflare R2. Credentials are loaded
    from environment variables or rclone configuration file.

    Credential sources (in priority order):

    1. Environment variables: `R2_ACCESS_KEY`, `R2_SECRET_KEY`, `R2_ENDPOINT`, `R2_REGION_NAME`
    2. rclone config file: `~/.config/rclone/rclone.conf` (section: `owid-r2`)

    Returns:
        Boto3 S3 client configured for R2.

    Example:
        ```python
        # Connect to R2
        client = connect_r2()

        # Use with boto3 operations
        client.list_objects_v2(Bucket='my-bucket', Prefix='data/')
        ```

    Note:
        For cached connections that reuse the same client across calls, use
        `connect_r2_cached()` instead. This is more efficient for multiple operations.

    See Also:
        - `connect_r2_cached()`: Thread-safe cached version
        - Cloudflare R2 docs: https://developers.cloudflare.com/r2/
    """
    import boto3

    # first, get the R2 credentials from dotenv
    R2_ACCESS_KEY = env.get("R2_ACCESS_KEY")
    R2_SECRET_KEY = env.get("R2_SECRET_KEY")
    R2_ENDPOINT = env.get("R2_ENDPOINT")
    R2_REGION_NAME = env.get("R2_REGION_NAME")

    # alternatively, get them from rclone config
    if not R2_ACCESS_KEY or not R2_SECRET_KEY or not R2_ENDPOINT:
        try:
            rclone_config = _read_owid_rclone_config()
            R2_ACCESS_KEY = R2_ACCESS_KEY or rclone_config.get("access_key_id")
            R2_SECRET_KEY = R2_SECRET_KEY or rclone_config.get("secret_access_key")
            R2_ENDPOINT = R2_ENDPOINT or rclone_config.get("endpoint")
            R2_REGION_NAME = R2_REGION_NAME or rclone_config.get("region")
        except KeyError:
            pass

    cfg = Config(
        # These are necessary to avoid sending header `content-encoding: gzip,aws-chunked` which breaks Admin
        # see https://developers.cloudflare.com/r2/examples/aws/boto3/
        request_checksum_calculation="when_required",
        response_checksum_validation="when_required",
    )

    client = boto3.client(
        service_name="s3",
        aws_access_key_id=R2_ACCESS_KEY,
        aws_secret_access_key=R2_SECRET_KEY,
        endpoint_url=R2_ENDPOINT or "https://078fcdfed9955087315dd86792e71a7e.r2.cloudflarestorage.com",
        region_name=R2_REGION_NAME or "auto",
        config=cfg,
    )

    return client

connect_r2_cached

connect_r2_cached() -> BaseClient

Create a cached, thread-safe connection to Cloudflare R2.

Returns a cached R2 client that's reused across multiple calls. This is more efficient than creating a new connection for every request. Thread-safe through locking mechanism.

Returns:

  • BaseClient

    Cached boto3 S3 client configured for R2.

Example

Use cached connection for multiple operations

client = connect_r2_cached()
client.upload_file('local.csv', 'bucket', 'remote.csv')
client.download_file('bucket', 'data.json', 'local.json')
# Both use the same underlying connection

Note

The connection is cached indefinitely. If credentials change during runtime, the application needs to be restarted.

See Also
  • connect_r2(): Non-cached version for one-time connections
Source code in lib/catalog/owid/catalog/s3_utils.py
def connect_r2_cached() -> BaseClient:
    """Create a cached, thread-safe connection to Cloudflare R2.

    Returns a cached R2 client that's reused across multiple calls. This is more
    efficient than creating a new connection for every request. Thread-safe through
    locking mechanism.

    Returns:
        Cached boto3 S3 client configured for R2.

    Example:
        Use cached connection for multiple operations
        ```python
        client = connect_r2_cached()
        client.upload_file('local.csv', 'bucket', 'remote.csv')
        client.download_file('bucket', 'data.json', 'local.json')
        # Both use the same underlying connection
        ```

    Note:
        The connection is cached indefinitely. If credentials change during runtime,
        the application needs to be restarted.

    See Also:
        - `connect_r2()`: Non-cached version for one-time connections
    """
    # creating a client is not thread safe, lock it
    with BOTO3_CLIENT_LOCK:
        return _connect_r2_cached()

download

download(
    s3_url: str,
    filename: str,
    quiet: bool = False,
    client: BaseClient | None = None,
) -> None

Download a file from S3 to local filesystem.

Parameters:

  • s3_url (str) –

    S3 URL of the file to download (e.g., s3://bucket/path/file.csv).

  • filename (str) –

    Local path where the file should be saved.

  • quiet (bool, default: False ) –

    If True, suppresses log messages. Defaults to False.

  • client (BaseClient | None, default: None ) –

    Optional boto3 S3 client. If None, connects to R2 automatically.

Raises:

  • UploadError

    If the download fails due to S3 client errors.

Example

Download a file

download("s3://my-bucket/data/file.csv", "local_file.csv")

Download quietly (no logs)

download("s3://my-bucket/data/file.csv", "file.csv", quiet=True)

Source code in lib/catalog/owid/catalog/s3_utils.py
def download(s3_url: str, filename: str, quiet: bool = False, client: BaseClient | None = None) -> None:
    """Download a file from S3 to local filesystem.

    Args:
        s3_url: S3 URL of the file to download (e.g., `s3://bucket/path/file.csv`).
        filename: Local path where the file should be saved.
        quiet: If True, suppresses log messages. Defaults to False.
        client: Optional boto3 S3 client. If None, connects to R2 automatically.

    Raises:
        UploadError: If the download fails due to S3 client errors.

    Example:
        Download a file
        ```python
        download("s3://my-bucket/data/file.csv", "local_file.csv")
        ```

        Download quietly (no logs)
        ```python
        download("s3://my-bucket/data/file.csv", "file.csv", quiet=True)
        ```
    """
    client = client or connect_r2()

    bucket, key = s3_bucket_key(s3_url)

    try:
        client.download_file(bucket, key, filename)  # ty: ignore
    except ClientError as e:
        log.error(e)
        raise UploadError(e)

    if not quiet:
        log.info("DOWNLOADED", s3_url=s3_url, filename=filename)

download_s3_folder

download_s3_folder(
    s3_folder: str,
    local_dir: Path,
    exclude: list[str] = [],
    include: list[str] = [],
    client: BaseClient | None = None,
    max_workers: int = 20,
    delete: bool = False,
) -> None

Download all files from an S3 folder to a local directory.

Downloads all objects from an S3 folder using parallel threads for efficiency. Supports filtering with include/exclude patterns and optional deletion of local files not present in S3.

Parameters:

  • s3_folder (str) –

    S3 folder URL. Must end with a slash (e.g., s3://bucket/folder/).

  • local_dir (Path) –

    Local directory path where files will be downloaded.

  • exclude (list[str], default: [] ) –

    List of patterns to exclude from download. Files containing any of these patterns will be skipped.

  • include (list[str], default: [] ) –

    List of patterns to include in download. If specified, only files containing one of these patterns will be downloaded.

  • client (BaseClient | None, default: None ) –

    Optional boto3 S3 client. If None, connects to R2 automatically.

  • max_workers (int, default: 20 ) –

    Maximum number of parallel download threads. Defaults to 20.

  • delete (bool, default: False ) –

    If True, deletes local files that don't exist in the S3 folder. Defaults to False.

Raises:

Example

Download entire folder

from pathlib import Path
download_s3_folder(
    "s3://my-bucket/data/",
    Path("local_data")
)

Download only CSV files

download_s3_folder(
    "s3://my-bucket/data/",
    Path("local_data"),
    include=[".csv"]
)

Download and sync (delete local files not in S3)

download_s3_folder(
    "s3://my-bucket/data/",
    Path("local_data"),
    delete=True
)

Exclude backup files

download_s3_folder(
    "s3://my-bucket/data/",
    Path("local_data"),
    exclude=[".bak", ".tmp"]
)

Note

The local_dir is created automatically if it doesn't exist.

Source code in lib/catalog/owid/catalog/s3_utils.py
def download_s3_folder(
    s3_folder: str,
    local_dir: Path,
    exclude: list[str] = [],
    include: list[str] = [],
    client: BaseClient | None = None,
    max_workers: int = 20,
    delete: bool = False,
) -> None:
    """Download all files from an S3 folder to a local directory.

    Downloads all objects from an S3 folder using parallel threads for efficiency.
    Supports filtering with include/exclude patterns and optional deletion of
    local files not present in S3.

    Args:
        s3_folder: S3 folder URL. Must end with a slash (e.g., `s3://bucket/folder/`).
        local_dir: Local directory path where files will be downloaded.
        exclude: List of patterns to exclude from download. Files containing any
            of these patterns will be skipped.
        include: List of patterns to include in download. If specified, only files
            containing one of these patterns will be downloaded.
        client: Optional boto3 S3 client. If None, connects to R2 automatically.
        max_workers: Maximum number of parallel download threads. Defaults to 20.
        delete: If True, deletes local files that don't exist in the S3 folder.
            Defaults to False.

    Raises:
        AssertionError: If s3_folder doesn't end with a slash.
        UploadError: If any download fails.

    Example:
        Download entire folder
        ```python
        from pathlib import Path
        download_s3_folder(
            "s3://my-bucket/data/",
            Path("local_data")
        )
        ```

        Download only CSV files
        ```python
        download_s3_folder(
            "s3://my-bucket/data/",
            Path("local_data"),
            include=[".csv"]
        )
        ```

        Download and sync (delete local files not in S3)
        ```python
        download_s3_folder(
            "s3://my-bucket/data/",
            Path("local_data"),
            delete=True
        )
        ```

        Exclude backup files
        ```python
        download_s3_folder(
            "s3://my-bucket/data/",
            Path("local_data"),
            exclude=[".bak", ".tmp"]
        )
        ```

    Note:
        The local_dir is created automatically if it doesn't exist.
    """
    assert s3_folder.endswith("/"), "s3_folder must end with a slash"

    client = client or connect_r2()

    bucket, _ = s3_bucket_key(s3_folder)

    if not local_dir.exists():
        local_dir.mkdir(parents=True)

    s3_keys = list_s3_objects(s3_folder, client=client)

    if exclude:
        s3_keys = [key for key in s3_keys if not any(pattern in key for pattern in exclude)]

    if include:
        s3_keys = [key for key in s3_keys if any(pattern in key for pattern in include)]

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for s3_key in s3_keys:
            local_file_path = local_dir / Path(s3_key).name
            futures.append(
                executor.submit(
                    download,
                    f"s3://{bucket}/{s3_key}",
                    local_file_path.as_posix(),
                    client=client,
                    quiet=True,
                )
            )

        concurrent.futures.wait(futures)

    if delete:
        local_files = set(local_dir.glob("*"))
        downloaded_files = {local_dir / Path(s3_key).name for s3_key in s3_keys}
        files_to_delete = local_files - downloaded_files
        for file in files_to_delete:
            file.unlink()

list_s3_objects

list_s3_objects(
    s3_folder: str, client: BaseClient | None = None
) -> list[str]

List all objects in an S3 folder.

Recursively lists all objects within an S3 folder, handling pagination automatically. Excludes folder markers (keys ending with '/').

Parameters:

  • s3_folder (str) –

    S3 folder URL (e.g., s3://bucket/path/to/folder/).

  • client (BaseClient | None, default: None ) –

    Optional boto3 S3 client. If None, connects to R2 automatically.

Returns:

  • list[str]

    List of object keys (paths) within the folder.

Example

List all objects in a folder

objects = list_s3_objects("s3://my-bucket/data/")
print(f"Found {len(objects)} objects")

Use custom client

import boto3
client = boto3.client('s3')
objects = list_s3_objects("s3://my-bucket/data/", client=client)

Note

This function handles pagination automatically for folders with more than 1000 objects.

Source code in lib/catalog/owid/catalog/s3_utils.py
def list_s3_objects(s3_folder: str, client: BaseClient | None = None) -> list[str]:
    """List all objects in an S3 folder.

    Recursively lists all objects within an S3 folder, handling pagination
    automatically. Excludes folder markers (keys ending with '/').

    Args:
        s3_folder: S3 folder URL (e.g., `s3://bucket/path/to/folder/`).
        client: Optional boto3 S3 client. If None, connects to R2 automatically.

    Returns:
        List of object keys (paths) within the folder.

    Example:
        List all objects in a folder
        ```python
        objects = list_s3_objects("s3://my-bucket/data/")
        print(f"Found {len(objects)} objects")
        ```

        Use custom client
        ```python
        import boto3
        client = boto3.client('s3')
        objects = list_s3_objects("s3://my-bucket/data/", client=client)
        ```

    Note:
        This function handles pagination automatically for folders with
        more than 1000 objects.
    """
    client = client or connect_r2()

    bucket, key = s3_bucket_key(s3_folder)
    continuation_token = None
    keys = []

    while True:
        if continuation_token:
            response = client.list_objects_v2(Bucket=bucket, Prefix=key, ContinuationToken=continuation_token)  # ty: ignore
        else:
            response = client.list_objects_v2(Bucket=bucket, Prefix=key)  # ty: ignore

        if "Contents" in response:
            keys.extend([obj["Key"] for obj in response["Contents"] if not obj["Key"].endswith("/")])

        if response.get("IsTruncated"):
            continuation_token = response.get("NextContinuationToken")
        else:
            break

    return keys

s3_bucket_key

s3_bucket_key(url: str) -> tuple[str, str]

Extract bucket name and key from an S3 URL.

Parses both s3:// and https:// S3 URLs to extract the bucket name and object key.

Parameters:

  • url (str) –

    S3 URL in either format: - s3://bucket-name/path/to/object - https://bucket-name.s3.region.amazonaws.com/path/to/object

Returns:

  • tuple[str, str]

    Tuple of (bucket_name, object_key).

Example
# S3 protocol URL
bucket, key = s3_bucket_key("s3://my-bucket/data/file.csv")
# Returns: ("my-bucket", "data/file.csv")

# HTTPS URL
bucket, key = s3_bucket_key("https://my-bucket.s3.us-east-1.amazonaws.com/data/file.csv")
# Returns: ("my-bucket", "data/file.csv")
Source code in lib/catalog/owid/catalog/s3_utils.py
def s3_bucket_key(url: str) -> tuple[str, str]:
    """Extract bucket name and key from an S3 URL.

    Parses both `s3://` and `https://` S3 URLs to extract the bucket name
    and object key.

    Args:
        url: S3 URL in either format:
            - `s3://bucket-name/path/to/object`
            - `https://bucket-name.s3.region.amazonaws.com/path/to/object`

    Returns:
        Tuple of (bucket_name, object_key).

    Example:
        ```python
        # S3 protocol URL
        bucket, key = s3_bucket_key("s3://my-bucket/data/file.csv")
        # Returns: ("my-bucket", "data/file.csv")

        # HTTPS URL
        bucket, key = s3_bucket_key("https://my-bucket.s3.us-east-1.amazonaws.com/data/file.csv")
        # Returns: ("my-bucket", "data/file.csv")
        ```
    """
    parsed = urlparse(url)
    bucket = parsed.netloc
    key = parsed.path.lstrip("/")

    # strip region from bucket name in https scheme
    if parsed.scheme == "https":
        bucket = bucket.split(".")[0]

    return bucket, key

upload

upload(
    s3_url: str,
    filename: str | Path,
    public: bool = False,
    quiet: bool = False,
    downloadable: bool = False,
) -> None

Upload the file at the given local filename to the S3 URL.

Parameters:

  • s3_url (str) –

    S3 URL to upload to

  • filename (str | Path) –

    Local file to upload

  • public (bool, default: False ) –

    Whether to make the file publicly readable

  • quiet (bool, default: False ) –

    Whether to suppress log messages

  • downloadable (bool, default: False ) –

    If True, force browsers to download the file instead of displaying it inline. Sets Content-Disposition header to 'attachment; filename="..."'

Source code in lib/catalog/owid/catalog/s3_utils.py
def upload(
    s3_url: str, filename: str | Path, public: bool = False, quiet: bool = False, downloadable: bool = False
) -> None:
    """Upload the file at the given local filename to the S3 URL.

    Args:
        s3_url: S3 URL to upload to
        filename: Local file to upload
        public: Whether to make the file publicly readable
        quiet: Whether to suppress log messages
        downloadable: If True, force browsers to download the file instead of displaying it inline. Sets Content-Disposition header to 'attachment; filename="..."'
    """
    client = connect_r2()
    bucket, key = s3_bucket_key(s3_url)
    extra_args = {"ACL": "public-read"} if public else {}

    # Add Content-Disposition header to force download with correct filename
    if downloadable:
        file_name = Path(filename).name
        extra_args["ContentDisposition"] = f'attachment; filename="{file_name}"'

    filename_str = str(filename)
    try:
        client.upload_file(filename_str, bucket, key, ExtraArgs=extra_args)
    except ClientError as e:
        log.error(e)
        raise UploadError(e)

    if not quiet:
        log.info(f"UPLOADED: {filename_str} -> {s3_url}")