Skip to content

Datautils Library

The owid-datautils library provides utility functions for common data processing tasks in ETL workflows.

You can install it via pip:

pip install owid-datautils

owid.datautils.dataframes

Objects related to pandas dataframes.

Classes:

Functions:

  • apply_on_categoricals

    Apply a function across multiple categorical Series efficiently.

  • are_equal

    Check if two DataFrames are equal with detailed comparison report.

  • combine_two_overlapping_dataframes

    Combine two DataFrames with overlapping columns, prioritizing the first.

  • compare

    Compare two DataFrames element-wise for equality.

  • concatenate

    Concatenate while preserving categorical columns.

  • count_missing_in_groups

    Count the number of missing values in each group.

  • groupby_agg

    Group DataFrame with intelligent NaN handling during aggregation.

  • has_index

    Check if a DataFrame has a meaningful index.

  • map_series

    Map Series values with performance optimization and flexible NaN handling.

  • multi_merge

    Merge multiple DataFrames on common columns.

  • rename_categories

    Alternative to pd.Series.cat.rename_categories which supports non-unique categories.

  • to_file

    Save a dataframe in any format.

DataFramesHaveDifferentLengths

DataFramesHaveDifferentLengths(
    exception_message: str | None = None, *args: Any
)

Bases: ExceptionFromDocstring

Dataframes cannot be compared because they have different number of rows.

Source code in lib/datautils/owid/datautils/common.py
def __init__(self, exception_message: str | None = None, *args: Any):
    super().__init__(exception_message or self.__doc__, *args)

ObjectsAreNotDataframes

ObjectsAreNotDataframes(
    exception_message: str | None = None, *args: Any
)

Bases: ExceptionFromDocstring

Given objects are not dataframes.

Source code in lib/datautils/owid/datautils/common.py
def __init__(self, exception_message: str | None = None, *args: Any):
    super().__init__(exception_message or self.__doc__, *args)

apply_on_categoricals

apply_on_categoricals(
    cat_series: list[Series], func: Callable[..., str]
) -> Series

Apply a function across multiple categorical Series efficiently.

High-performance operation that applies a function to categorical Series without converting to strings first. Uses category codes internally to prevent memory explosion and significantly improve speed.

Parameters:

  • cat_series (list[Series]) –

    List of Series with categorical dtype.

  • func (Callable[..., str]) –

    Function that takes N arguments (one per Series) and returns a string. Called for each unique combination of category codes.

Returns:

  • Series

    New categorical Series with the function applied.

Example
import pandas as pd
from owid.datautils.dataframes import apply_on_categoricals

# Combine country and region categories
countries = pd.Series(["USA", "UK", "USA"], dtype="category")
regions = pd.Series(["Americas", "Europe", "Americas"], dtype="category")

# Concatenate with separator
result = apply_on_categoricals(
    [countries, regions],
    lambda c, r: f"{c} ({r})"
)
# Result: ["USA (Americas)", "UK (Europe)", "USA (Americas)"]
# Still categorical dtype, much faster than string operations
Note

This is significantly faster than converting categories to strings, especially for large DataFrames with repeated category values.

Source code in lib/datautils/owid/datautils/dataframes.py
def apply_on_categoricals(cat_series: list[pd.Series], func: Callable[..., str]) -> pd.Series:
    """Apply a function across multiple categorical Series efficiently.

    High-performance operation that applies a function to categorical Series
    without converting to strings first. Uses category codes internally to
    prevent memory explosion and significantly improve speed.

    Args:
        cat_series: List of Series with categorical dtype.
        func: Function that takes N arguments (one per Series) and returns a string.
            Called for each unique combination of category codes.

    Returns:
        New categorical Series with the function applied.

    Example:
        ```python
        import pandas as pd
        from owid.datautils.dataframes import apply_on_categoricals

        # Combine country and region categories
        countries = pd.Series(["USA", "UK", "USA"], dtype="category")
        regions = pd.Series(["Americas", "Europe", "Americas"], dtype="category")

        # Concatenate with separator
        result = apply_on_categoricals(
            [countries, regions],
            lambda c, r: f"{c} ({r})"
        )
        # Result: ["USA (Americas)", "UK (Europe)", "USA (Americas)"]
        # Still categorical dtype, much faster than string operations
        ```

    Note:
        This is significantly faster than converting categories to strings,
        especially for large DataFrames with repeated category values.
    """
    seen = {}
    codes = []
    categories = []
    for cat_codes in zip(*[s.cat.codes for s in cat_series]):
        if cat_codes not in seen:
            # add category
            # -1 is a special code for missing values
            cat_values = [s.cat.categories[code] if code != -1 else np.nan for s, code in zip(cat_series, cat_codes)]
            categories.append(func(*cat_values))
            seen[cat_codes] = len(categories) - 1

        # use existing category
        codes.append(seen[cat_codes])

    return cast(pd.Series, pd.Categorical.from_codes(codes, categories=pd.Index(categories)))

are_equal

are_equal(
    df1: DataFrame,
    df2: DataFrame,
    absolute_tolerance: float = 1e-08,
    relative_tolerance: float = 1e-08,
    verbose: bool = True,
) -> tuple[bool, DataFrame]

Check if two DataFrames are equal with detailed comparison report.

Comprehensive equality check that compares structure, dtypes, and values with tolerance for floating-point numbers. Treats all NaN values as equal. Optionally prints a detailed summary of differences.

Parameters:

  • df1 (DataFrame) –

    First DataFrame to compare.

  • df2 (DataFrame) –

    Second DataFrame to compare.

  • absolute_tolerance (float, default: 1e-08 ) –

    Maximum absolute difference for numeric equality: abs(a - b) <= absolute_tolerance.

  • relative_tolerance (float, default: 1e-08 ) –

    Maximum relative difference for numeric equality: abs(a - b) / abs(b) <= relative_tolerance.

  • verbose (bool, default: True ) –

    If True, print detailed comparison summary showing all differences found.

Returns:

  • tuple[bool, DataFrame]

    Tuple of (equality_flag, comparison_dataframe) where: - equality_flag: True if DataFrames are equal within tolerance - comparison_dataframe: Boolean DataFrame showing element-wise equality. Empty if DataFrames have incompatible shapes.

Example
import pandas as pd
from owid.datautils.dataframes import are_equal

df1 = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
df2 = pd.DataFrame({"a": [1, 2], "b": [3, 4]})

equal, comparison = are_equal(df1, df2, verbose=True)
# Prints: "Dataframes are identical..."
# Returns: (True, DataFrame of all True values)

df3 = pd.DataFrame({"a": [1, 2], "c": [5, 6]})
equal, comparison = are_equal(df1, df3, verbose=True)
# Prints differences: missing columns, etc.
# Returns: (False, DataFrame)
Source code in lib/datautils/owid/datautils/dataframes.py
def are_equal(
    df1: pd.DataFrame,
    df2: pd.DataFrame,
    absolute_tolerance: float = 1e-8,
    relative_tolerance: float = 1e-8,
    verbose: bool = True,
) -> tuple[bool, pd.DataFrame]:
    """Check if two DataFrames are equal with detailed comparison report.

    Comprehensive equality check that compares structure, dtypes, and values
    with tolerance for floating-point numbers. Treats all NaN values as equal.
    Optionally prints a detailed summary of differences.

    Args:
        df1: First DataFrame to compare.
        df2: Second DataFrame to compare.
        absolute_tolerance: Maximum absolute difference for numeric equality:
            `abs(a - b) <= absolute_tolerance`.
        relative_tolerance: Maximum relative difference for numeric equality:
            `abs(a - b) / abs(b) <= relative_tolerance`.
        verbose: If True, print detailed comparison summary showing all
            differences found.

    Returns:
        Tuple of (equality_flag, comparison_dataframe) where:
            - equality_flag: True if DataFrames are equal within tolerance
            - comparison_dataframe: Boolean DataFrame showing element-wise
              equality. Empty if DataFrames have incompatible shapes.

    Example:
        ```python
        import pandas as pd
        from owid.datautils.dataframes import are_equal

        df1 = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
        df2 = pd.DataFrame({"a": [1, 2], "b": [3, 4]})

        equal, comparison = are_equal(df1, df2, verbose=True)
        # Prints: "Dataframes are identical..."
        # Returns: (True, DataFrame of all True values)

        df3 = pd.DataFrame({"a": [1, 2], "c": [5, 6]})
        equal, comparison = are_equal(df1, df3, verbose=True)
        # Prints differences: missing columns, etc.
        # Returns: (False, DataFrame)
        ```
    """
    # Initialise flag that is True only if both dataframes are equal.
    equal = True
    # Initialise flag that is True if dataframes can be compared cell by cell.
    can_be_compared = True
    # Initialise string of messages, which will optionally be printed.
    summary = ""

    # Check if all columns in df2 are in df1.
    missing_in_df1 = sorted(set(df2.columns) - set(df1.columns))
    if len(missing_in_df1):
        summary += f"\n* {len(missing_in_df1)} columns in df2 missing in df1.\n"
        summary += "\n".join([f"  * {col}" for col in missing_in_df1])
        equal = False

    # Check if all columns in df1 are in df2.
    missing_in_df2 = sorted(set(df1.columns) - set(df2.columns))
    if len(missing_in_df2):
        summary += f"\n* {len(missing_in_df2)} columns in df1 missing in df2.\n"
        summary += "\n".join([f"  * {col}" for col in missing_in_df2])
        equal = False

    # Check if dataframes have the same number of rows.
    if len(df1) != len(df2):
        summary += f"\n* {len(df1)} rows in df1 and {len(df2)} rows in df2."
        equal = False
        can_be_compared = False

    # Check for differences in column names or types.
    common_columns = sorted(set(df1.columns) & set(df2.columns))
    all_columns = sorted(set(df1.columns) | set(df2.columns))
    if common_columns == all_columns:
        if df1.columns.tolist() != df2.columns.tolist():
            summary += "\n* Columns are sorted differently.\n"
            equal = False
        for col in common_columns:
            if df1[col].dtype != df2[col].dtype:
                summary += f"  * Column {col} is of type {df1[col].dtype} for df1, but type {df2[col].dtype} for df2."
                equal = False
    else:
        summary += f"\n* Only {len(common_columns)} common columns out of {len(all_columns)} distinct columns."
        equal = False

    if not can_be_compared:
        # Dataframes cannot be compared.
        compared = pd.DataFrame()
        equal = False
    else:
        # Check if indexes are equal.
        if (df1.index != df2.index).any():
            summary += "\n* Dataframes have different indexes (consider resetting indexes of input dataframes)."
            equal = False

        # Dataframes can be compared cell by cell (two nans on the same cell are considered equal).
        compared = compare(
            df1,
            df2,
            columns=common_columns,
            absolute_tolerance=absolute_tolerance,
            relative_tolerance=relative_tolerance,
        )
        all_values_equal = compared.all().all()  # ty: ignore
        if not all_values_equal:
            summary += "\n* Values differ by more than the given absolute and relative tolerances."

        # Dataframes are equal only if all previous checks have passed.
        equal = equal & all_values_equal

    if equal:
        summary += (
            "Dataframes are identical (within absolute tolerance of"
            f" {absolute_tolerance} and relative tolerance of {relative_tolerance})."
        )

    if verbose:
        # Optionally print the summary of the comparison.
        print(summary)

    return equal, compared  # ty: ignore[invalid-return-type]

combine_two_overlapping_dataframes

combine_two_overlapping_dataframes(
    df1: DataFrame,
    df2: DataFrame,
    index_columns: list[str] | None = None,
    keep_column_order: bool = False,
) -> DataFrame

Combine two DataFrames with overlapping columns, prioritizing the first.

Intelligent merge that combines DataFrames with potentially identical columns, prioritizing values from df1 but filling its NaN values with data from df2. Avoids creating duplicate columns (e.g., "col_x", "col_y") that result from standard merges.

Why not use standard operations
  • pd.merge(): Creates duplicate columns with "_x" and "_y" suffixes
  • pd.concat() + drop_duplicates(): Would keep NaN values from df1 instead of filling them with df2 values

Parameters:

  • df1 (DataFrame) –

    First DataFrame (higher priority for values).

  • df2 (DataFrame) –

    Second DataFrame (used to fill NaN values in df1).

  • index_columns (list[str] | None, default: None ) –

    Column names to use as index for alignment (e.g., ["country", "year"]). Must exist in both DataFrames as regular columns. If None, uses existing DataFrame indices.

  • keep_column_order (bool, default: False ) –

    If True, preserve original column order (df1 columns first, then new df2 columns). If False, sort columns alphabetically.

Returns:

  • DataFrame

    Combined DataFrame with union of rows and columns, prioritizing df1 values.

Example
import pandas as pd
from owid.datautils.dataframes import combine_two_overlapping_dataframes

df1 = pd.DataFrame({
    "country": ["USA", "UK"],
    "gdp": [20, None],
    "population": [330, 67]
})

df2 = pd.DataFrame({
    "country": ["USA", "UK", "France"],
    "gdp": [21, 3, 2.7],
    "area": [9.8, 0.24, 0.64]
})

result = combine_two_overlapping_dataframes(
    df1, df2,
    index_columns=["country"]
)
#   country   gdp  population  area
# 0     USA  20.0         330  9.80  # GDP from df1
# 1      UK   3.0          67  0.24  # GDP from df2 (was NaN in df1)
# 2  France   2.7         NaN  0.64  # New row from df2
Source code in lib/datautils/owid/datautils/dataframes.py
def combine_two_overlapping_dataframes(
    df1: pd.DataFrame,
    df2: pd.DataFrame,
    index_columns: list[str] | None = None,
    keep_column_order: bool = False,
) -> pd.DataFrame:
    """Combine two DataFrames with overlapping columns, prioritizing the first.

    Intelligent merge that combines DataFrames with potentially identical columns,
    prioritizing values from df1 but filling its NaN values with data from df2.
    Avoids creating duplicate columns (e.g., "col_x", "col_y") that result from
    standard merges.

    Why not use standard operations:
        - `pd.merge()`: Creates duplicate columns with "_x" and "_y" suffixes
        - `pd.concat()` + `drop_duplicates()`: Would keep NaN values from df1
          instead of filling them with df2 values

    Args:
        df1: First DataFrame (higher priority for values).
        df2: Second DataFrame (used to fill NaN values in df1).
        index_columns: Column names to use as index for alignment (e.g., ["country", "year"]).
            Must exist in both DataFrames as regular columns. If None, uses existing
            DataFrame indices.
        keep_column_order: If True, preserve original column order (df1 columns first,
            then new df2 columns). If False, sort columns alphabetically.

    Returns:
        Combined DataFrame with union of rows and columns, prioritizing df1 values.

    Example:
        ```python
        import pandas as pd
        from owid.datautils.dataframes import combine_two_overlapping_dataframes

        df1 = pd.DataFrame({
            "country": ["USA", "UK"],
            "gdp": [20, None],
            "population": [330, 67]
        })

        df2 = pd.DataFrame({
            "country": ["USA", "UK", "France"],
            "gdp": [21, 3, 2.7],
            "area": [9.8, 0.24, 0.64]
        })

        result = combine_two_overlapping_dataframes(
            df1, df2,
            index_columns=["country"]
        )
        #   country   gdp  population  area
        # 0     USA  20.0         330  9.80  # GDP from df1
        # 1      UK   3.0          67  0.24  # GDP from df2 (was NaN in df1)
        # 2  France   2.7         NaN  0.64  # New row from df2
        ```
    """
    df1 = df1.copy()
    df2 = df2.copy()
    if index_columns is not None:
        # Ensure dataframes have a dummy index.
        if not ((df1.index.names == [None]) and (df2.index.names == [None])):
            warnings.warn("If index_columns is given, dataframes should have a dummy index. Use reset_index().")
        # Set index columns.
        df1 = df1.set_index(index_columns)
        df2 = df2.set_index(index_columns)
    else:
        # Ensure dataframes have the same indexes.
        if not (df1.index.names == df2.index.names):
            warnings.warn("Dataframes should have the same indexes.")

    # Align both dataframes on their common indexes.
    # Give priority to df1 on overlapping values.
    combined, df2 = df1.align(df2)

    new_columns = df2.columns.difference(df1.columns)
    for col in new_columns:
        try:
            combined[col] = combined[col].astype(df2[col].dtype, copy=False)
        except ValueError:
            # casting NaNs to integer will fail
            pass

    # Fill missing values in df1 with values from df2.
    combined = combined.fillna(df2)

    if index_columns is not None:
        combined = combined.reset_index()

    if keep_column_order:
        # The previous operations will automatically sort columns alphanumerically.
        # To keep the original order of columns, we need to find that sequence of columns.
        # First, columns of df1, and then all columns in df2 that were not in df1.
        if index_columns is not None:
            columns = index_columns + df1.columns.tolist()
        else:
            columns = df1.columns.tolist()
        columns = columns + [column for column in df2.columns if column not in columns]
        combined = combined[columns]

    # It would be good to have a 'keep_row_order' option, but it's a bit tricky.

    return cast(pd.DataFrame, combined)

compare

compare(
    df1: DataFrame,
    df2: DataFrame,
    columns: list[str] | None = None,
    absolute_tolerance: float = 1e-08,
    relative_tolerance: float = 1e-08,
) -> DataFrame

Compare two DataFrames element-wise for equality.

Performs element-by-element comparison of two DataFrames, treating NaN values as equal and allowing tolerance for floating-point comparisons.

Parameters:

  • df1 (DataFrame) –

    First DataFrame to compare.

  • df2 (DataFrame) –

    Second DataFrame to compare.

  • columns (list[str] | None, default: None ) –

    List of column names to compare (must exist in both DataFrames). If None, all common columns are compared.

  • absolute_tolerance (float, default: 1e-08 ) –

    Maximum absolute difference allowed for values to be considered equal: abs(a - b) <= absolute_tolerance.

  • relative_tolerance (float, default: 1e-08 ) –

    Maximum relative difference allowed for values to be considered equal: abs(a - b) / abs(b) <= relative_tolerance.

Returns:

  • DataFrame

    DataFrame of booleans with the same shape as the comparison. Each element

  • DataFrame

    is True if the corresponding values in df1 and df2 are equal (within tolerance).

Raises:

Example
import pandas as pd
from owid.datautils.dataframes import compare

df1 = pd.DataFrame({"a": [1.0, 2.0], "b": [3.0, 4.0]})
df2 = pd.DataFrame({"a": [1.0001, 2.0], "b": [3.0, 4.1]})

result = compare(df1, df2, absolute_tolerance=0.01)
print(result)
#       a      b
# 0  True   True
# 1  True  False
Note

DataFrames must have the same number of rows to be compared.

Source code in lib/datautils/owid/datautils/dataframes.py
def compare(
    df1: pd.DataFrame,
    df2: pd.DataFrame,
    columns: list[str] | None = None,
    absolute_tolerance: float = 1e-8,
    relative_tolerance: float = 1e-8,
) -> pd.DataFrame:
    """Compare two DataFrames element-wise for equality.

    Performs element-by-element comparison of two DataFrames, treating NaN values
    as equal and allowing tolerance for floating-point comparisons.

    Args:
        df1: First DataFrame to compare.
        df2: Second DataFrame to compare.
        columns: List of column names to compare (must exist in both DataFrames).
            If None, all common columns are compared.
        absolute_tolerance: Maximum absolute difference allowed for values to be
            considered equal: `abs(a - b) <= absolute_tolerance`.
        relative_tolerance: Maximum relative difference allowed for values to be
            considered equal: `abs(a - b) / abs(b) <= relative_tolerance`.

    Returns:
        DataFrame of booleans with the same shape as the comparison. Each element
        is True if the corresponding values in df1 and df2 are equal (within tolerance).

    Raises:
        ObjectsAreNotDataframes: If either input is not a DataFrame.
        DataFramesHaveDifferentLengths: If DataFrames have different row counts.

    Example:
        ```python
        import pandas as pd
        from owid.datautils.dataframes import compare

        df1 = pd.DataFrame({"a": [1.0, 2.0], "b": [3.0, 4.0]})
        df2 = pd.DataFrame({"a": [1.0001, 2.0], "b": [3.0, 4.1]})

        result = compare(df1, df2, absolute_tolerance=0.01)
        print(result)
        #       a      b
        # 0  True   True
        # 1  True  False
        ```

    Note:
        DataFrames must have the same number of rows to be compared.
    """
    # Ensure dataframes can be compared.
    if (not isinstance(df1, pd.DataFrame)) or (not isinstance(df2, pd.DataFrame)):
        raise ObjectsAreNotDataframes
    if len(df1) != len(df2):
        raise DataFramesHaveDifferentLengths

    # If columns are not specified, assume common columns.
    if columns is None:
        columns = sorted(set(df1.columns) & set(df2.columns))

    # Compare, column by column, the elements of the two dataframes.
    compared = pd.DataFrame()
    for col in columns:
        if (df1[col].dtype in (object, "category", "string")) or (df2[col].dtype in (object, "category", "string")):
            # Apply a direct comparison for strings or categories
            compared_row = df1[col].values == df2[col].values
        else:
            # For numeric data, consider them equal within certain absolute and relative tolerances.
            compared_row = np.isclose(
                df1[col].values,  # ty: ignore
                df2[col].values,  # ty: ignore
                atol=absolute_tolerance,
                rtol=relative_tolerance,
            )
        # Treat nans as equal.
        compared_row[pd.isnull(df1[col].values) & pd.isnull(df2[col].values)] = True  # ty: ignore
        compared[col] = compared_row

    return compared

concatenate

concatenate(
    objs: list[DataFrame], **kwargs: Any
) -> DataFrame

Concatenate while preserving categorical columns.

Original source code from https://stackoverflow.com/a/57809778/1275818.

Source code in lib/datautils/owid/datautils/dataframes.py
def concatenate(objs: list[pd.DataFrame], **kwargs: Any) -> pd.DataFrame:
    """Concatenate while preserving categorical columns.

    Original source code from https://stackoverflow.com/a/57809778/1275818.
    """
    # Iterate on categorical columns common to all dfs
    for col in set.intersection(*[set(df.select_dtypes(include="category").columns) for df in objs]):
        ignore_order = any([not df[col].cat.ordered for df in objs])
        # Generate the union category across dfs for this column
        uc = union_categoricals([df[col] for df in objs], ignore_order=ignore_order)
        # Change to union category for all dataframes
        for df in objs:
            # df.loc[:, col] = pd.Categorical(df[col].values, categories=uc.categories)
            df[col] = df[col].astype(pd.CategoricalDtype(categories=uc.categories, ordered=uc.ordered))

    with warnings.catch_warnings():
        warnings.simplefilter(action="ignore", category=FutureWarning)
        return pd.concat(objs, **kwargs)

count_missing_in_groups

count_missing_in_groups(
    df: DataFrame, groupby_columns: list[str], **kwargs: Any
) -> DataFrame

Count the number of missing values in each group.

This is equivalent but faster than:

num_nans_detected = df.groupby(groupby_columns, **groupby_kwargs).agg(
    lambda x: pd.isnull(x).sum()
)
Source code in lib/datautils/owid/datautils/dataframes.py
def count_missing_in_groups(df: pd.DataFrame, groupby_columns: list[str], **kwargs: Any) -> pd.DataFrame:
    """Count the number of missing values in each group.

    This is equivalent but faster than:

    ```python
    num_nans_detected = df.groupby(groupby_columns, **groupby_kwargs).agg(
        lambda x: pd.isnull(x).sum()
    )
    ```

    """
    nan_columns = [c for c in df.columns if c not in groupby_columns]

    num_nans_detected = df[nan_columns].isnull().groupby([df[c] for c in groupby_columns], **kwargs).sum()

    return cast(pd.DataFrame, num_nans_detected)

groupby_agg

groupby_agg(
    df: DataFrame,
    groupby_columns: list[str] | str,
    aggregations: dict[str, Any] | None = None,
    num_allowed_nans: int | None = None,
    frac_allowed_nans: float | None = None,
    min_num_values: int | None = None,
) -> DataFrame

Group DataFrame with intelligent NaN handling during aggregation.

Enhanced version of pandas.DataFrame.groupby().agg() that provides control over how NaN values are treated during aggregation. By default, pandas ignores NaNs, which can produce misleading results (e.g., treating NaNs as zeros in sums).

This function supports weighted aggregations using the special syntax mean_weighted_by_<column_name> for any aggregation.

Behavior
  • When all NaN parameters are None: behaves like standard pandas groupby
  • When any NaN parameter is set: applies sequential validation rules

NaN Handling Rules (applied in order):

1. If `num_allowed_nans` is set: group becomes NaN if it has more NaNs
2. If `frac_allowed_nans` is set: group becomes NaN if NaN fraction exceeds threshold
3. If `min_num_values` is set: group becomes NaN if valid values < threshold

Parameters:

  • df (DataFrame) –

    Source DataFrame to group and aggregate.

  • groupby_columns (list[str] | str) –

    Column name(s) to group by. Can be a single string or list.

  • aggregations (dict[str, Any] | None, default: None ) –

    Dictionary mapping column names to aggregation functions. If None, applies 'sum' to all columns. Supports weighted means with syntax: {'col': 'mean_weighted_by_weight_col'}.

  • num_allowed_nans (int | None, default: None ) –

    Maximum number of NaN values allowed in a group before the aggregate becomes NaN.

  • frac_allowed_nans (float | None, default: None ) –

    Maximum fraction of NaN values allowed (0.0-1.0). Group becomes NaN if NaN fraction exceeds this threshold.

  • min_num_values (int | None, default: None ) –

    Minimum number of non-NaN values required. Group becomes NaN if it has fewer valid values (and at least one NaN).

Returns:

  • DataFrame

    Grouped and aggregated DataFrame with NaN handling applied.

Example

Basic groupby with NaN control

import pandas as pd
from owid.datautils.dataframes import groupby_agg

df = pd.DataFrame({
    "country": ["USA", "USA", "UK", "UK"],
    "year": [2020, 2021, 2020, 2021],
    "value": [100, None, 200, 300]
})

# Standard pandas sum treats NaN as 0
# result = df.groupby("country").sum()  # USA: 100

# With min_num_values=1, NaN if all values are NaN
result = groupby_agg(
    df,
    groupby_columns="country",
    aggregations={"value": "sum"},
    min_num_values=1
)
# USA: 100 (has 1 valid value), UK: 500 (has 2 valid values)

Weighted mean aggregation

df = pd.DataFrame({
    "country": ["USA", "USA", "UK"],
    "value": [10, 20, 30],
    "population": [100, 200, 300]
})

result = groupby_agg(
    df,
    groupby_columns="country",
    aggregations={"value": "mean_weighted_by_population"}
)
# USA: 16.67 = (10*100 + 20*200)/(100+200)

Note

Does not support multiple aggregations for the same column (e.g., {'a': ('sum', 'mean')}).

Source code in lib/datautils/owid/datautils/dataframes.py
def groupby_agg(
    df: pd.DataFrame,
    groupby_columns: list[str] | str,
    aggregations: dict[str, Any] | None = None,
    num_allowed_nans: int | None = None,
    frac_allowed_nans: float | None = None,
    min_num_values: int | None = None,
) -> pd.DataFrame:
    """Group DataFrame with intelligent NaN handling during aggregation.

    Enhanced version of `pandas.DataFrame.groupby().agg()` that provides control over
    how NaN values are treated during aggregation. By default, pandas ignores NaNs,
    which can produce misleading results (e.g., treating NaNs as zeros in sums).

    This function supports weighted aggregations using the special syntax
    `mean_weighted_by_<column_name>` for any aggregation.

    Behavior:
        - When all NaN parameters are None: behaves like standard pandas groupby
        - When any NaN parameter is set: applies sequential validation rules

        NaN Handling Rules (applied in order):

            1. If `num_allowed_nans` is set: group becomes NaN if it has more NaNs
            2. If `frac_allowed_nans` is set: group becomes NaN if NaN fraction exceeds threshold
            3. If `min_num_values` is set: group becomes NaN if valid values < threshold

    Args:
        df: Source DataFrame to group and aggregate.
        groupby_columns: Column name(s) to group by. Can be a single string or list.
        aggregations: Dictionary mapping column names to aggregation functions.
            If None, applies 'sum' to all columns. Supports weighted means with
            syntax: `{'col': 'mean_weighted_by_weight_col'}`.
        num_allowed_nans: Maximum number of NaN values allowed in a group before
            the aggregate becomes NaN.
        frac_allowed_nans: Maximum fraction of NaN values allowed (0.0-1.0).
            Group becomes NaN if NaN fraction exceeds this threshold.
        min_num_values: Minimum number of non-NaN values required. Group becomes
            NaN if it has fewer valid values (and at least one NaN).

    Returns:
        Grouped and aggregated DataFrame with NaN handling applied.

    Example:
        Basic groupby with NaN control
        ```python
        import pandas as pd
        from owid.datautils.dataframes import groupby_agg

        df = pd.DataFrame({
            "country": ["USA", "USA", "UK", "UK"],
            "year": [2020, 2021, 2020, 2021],
            "value": [100, None, 200, 300]
        })

        # Standard pandas sum treats NaN as 0
        # result = df.groupby("country").sum()  # USA: 100

        # With min_num_values=1, NaN if all values are NaN
        result = groupby_agg(
            df,
            groupby_columns="country",
            aggregations={"value": "sum"},
            min_num_values=1
        )
        # USA: 100 (has 1 valid value), UK: 500 (has 2 valid values)
        ```

        Weighted mean aggregation
        ```python
        df = pd.DataFrame({
            "country": ["USA", "USA", "UK"],
            "value": [10, 20, 30],
            "population": [100, 200, 300]
        })

        result = groupby_agg(
            df,
            groupby_columns="country",
            aggregations={"value": "mean_weighted_by_population"}
        )
        # USA: 16.67 = (10*100 + 20*200)/(100+200)
        ```

    Note:
        Does not support multiple aggregations for the same column
        (e.g., `{'a': ('sum', 'mean')}`).
    """
    if isinstance(groupby_columns, str):
        groupby_columns = [groupby_columns]

    if aggregations is None:
        columns_to_aggregate = [column for column in df.columns if column not in groupby_columns]
        aggregations = {column: "sum" for column in columns_to_aggregate}

    # Default groupby arguments, `observed` makes sure the final dataframe
    # does not explode with NaNs
    groupby_kwargs = {
        "dropna": False,
        "observed": True,
    }

    # Handle weighted aggregations separately if any are present
    weighted_aggregations = {
        k: v for k, v in aggregations.items() if isinstance(v, str) and v.startswith("mean_weighted_by_")
    }

    if weighted_aggregations:
        # Split out regular aggregations to handle them normally
        regular_aggregations = {
            k: v for k, v in aggregations.items() if not (isinstance(v, str) and v.startswith("mean_weighted_by_"))
        }

        # Handle regular aggregations first (if any)
        if regular_aggregations:
            grouped = df.groupby(groupby_columns, **groupby_kwargs).agg(regular_aggregations)  # ty: ignore
        else:
            # Create empty DataFrame with proper groupby index for weighted-only case
            grouped = (
                df.groupby(groupby_columns, dropna=False, observed=True)
                .size()
                .to_frame("_temp")
                .drop(columns=["_temp"])
            )

        # Add weighted mean columns
        for col, agg_func in weighted_aggregations.items():
            weight_col = agg_func.replace("mean_weighted_by_", "")
            if weight_col not in df.columns:
                raise ValueError(f"Weight column '{weight_col}' not found in data")

            weighted_results = df.groupby(groupby_columns, dropna=False, observed=True).apply(
                lambda group: _calculate_weighted_mean(
                    group, col, weight_col, num_allowed_nans, frac_allowed_nans, min_num_values
                ),
                include_groups=False,
            )
            grouped[col] = weighted_results  # ty: ignore
    else:
        # No weighted aggregations; use standard grouping logic
        grouped = df.groupby(groupby_columns, **groupby_kwargs).agg(aggregations)  # ty: ignore

    # Calculate a few necessary parameters related to the number of nans and valid elements.
    if (num_allowed_nans is not None) or (frac_allowed_nans is not None) or (min_num_values is not None):
        # Count the number of missing values in each group.
        num_nans_detected = count_missing_in_groups(df, groupby_columns, **groupby_kwargs)
    if (frac_allowed_nans is not None) or (min_num_values is not None):
        # Count number of total elements in each group (counting both nans and non-nan values).
        num_elements = df.groupby(groupby_columns, **groupby_kwargs).size()  # ty: ignore

    # Apply conditions sequentially.
    if num_allowed_nans is not None:
        # Make nan any aggregation where there were too many missing values.
        grouped = grouped[num_nans_detected <= num_allowed_nans]  # ty: ignore

    if frac_allowed_nans is not None:
        # Make nan any aggregation where there were too many missing values.
        grouped = grouped[num_nans_detected.divide(num_elements, axis="index") <= frac_allowed_nans]  # ty: ignore

    if min_num_values is not None:
        # Make nan any aggregation where there were too few valid (non-nan) values.
        # The number of valid values is the number of elements minus the number of nans. So, a priori, what we need is:
        # grouped = grouped[(-num_nans_detected.subtract(num_elements, axis="index") >= min_num_values)]
        # However, if a group has fewer elements than min_num_values, the condition is not fulfilled, and the aggregate
        # is nan. But that is probably not the desired behavior. Instead, if all elements in a group are valid, the
        # aggregate should exist, even if that number of valid values is smaller than min_num_values.
        # Therefore, we impose that either the number of valid values is >= min_num_values, or that there are no nans
        # (and hence all values are valid).
        grouped = grouped[
            (-num_nans_detected.subtract(num_elements, axis="index") >= min_num_values) | (num_nans_detected == 0)  # ty: ignore
        ]

    return cast(pd.DataFrame, grouped)

has_index

has_index(df: DataFrame) -> bool

Check if a DataFrame has a meaningful index.

Determines whether a DataFrame has an actual index set, or just the default dummy integer index created by pandas.

Parameters:

  • df (DataFrame) –

    DataFrame to check for index.

Returns:

  • bool

    True if DataFrame has a non-dummy (single or multi) index, False otherwise.

Example
import pandas as pd
from owid.datautils import has_index

# DataFrame with dummy index
df1 = pd.DataFrame({"a": [1, 2, 3]})
print(has_index(df1))  # False

# DataFrame with actual index
df2 = df1.set_index("a")
print(has_index(df2))  # True
Source code in lib/datautils/owid/datautils/dataframes.py
def has_index(df: pd.DataFrame) -> bool:
    """Check if a DataFrame has a meaningful index.

    Determines whether a DataFrame has an actual index set, or just the
    default dummy integer index created by pandas.

    Args:
        df: DataFrame to check for index.

    Returns:
        True if DataFrame has a non-dummy (single or multi) index, False otherwise.

    Example:
        ```python
        import pandas as pd
        from owid.datautils import has_index

        # DataFrame with dummy index
        df1 = pd.DataFrame({"a": [1, 2, 3]})
        print(has_index(df1))  # False

        # DataFrame with actual index
        df2 = df1.set_index("a")
        print(has_index(df2))  # True
        ```
    """
    # Dataframes always have an attribute index.names, which is a frozen list.
    # If the dataframe has no set index (i.e. if it has a dummy index), that list contains only [None].
    # In any other case, the frozen list contains one or more elements different than None.
    df_has_index = True if df.index.names[0] is not None else False

    return df_has_index

map_series

map_series(
    series: Series,
    mapping: dict[Any, Any],
    make_unmapped_values_nan: bool = False,
    warn_on_missing_mappings: bool = False,
    warn_on_unused_mappings: bool = False,
    show_full_warning: bool = False,
) -> Series

Map Series values with performance optimization and flexible NaN handling.

Enhanced version of pandas.Series.map() that:

  • Preserves unmapped values instead of converting to NaN (optional)
  • Much faster than Series.replace() for large DataFrames
  • Supports categorical Series with automatic category management
  • Provides warnings for missing or unused mappings

Behavior differences from pandas.Series.map():

- Default: unmapped values keep original values (not NaN)
- With `make_unmapped_values_nan=True`: same as `Series.map()`

Parameters:

  • series (Series) –

    Series to map values from.

  • mapping (dict[Any, Any]) –

    Dictionary mapping old values to new values.

  • make_unmapped_values_nan (bool, default: False ) –

    If True, unmapped values become NaN. If False, they retain original values.

  • warn_on_missing_mappings (bool, default: False ) –

    If True, warn about values in Series that don't exist in mapping.

  • warn_on_unused_mappings (bool, default: False ) –

    If True, warn about mapping entries not used by any value in Series.

  • show_full_warning (bool, default: False ) –

    If True, print full list of missing/unused values in warnings.

Returns:

  • Series

    Series with mapped values.

Example

Basic mapping

import pandas as pd
from owid.datautils.dataframes import map_series

series = pd.Series(["usa", "uk", "france"])
mapping = {"usa": "United States", "uk": "United Kingdom"}

# Default: unmapped values preserved
result = map_series(series, mapping)
# ["United States", "United Kingdom", "france"]

# With NaN for unmapped
result = map_series(series, mapping, make_unmapped_values_nan=True)
# ["United States", "United Kingdom", NaN]

With warnings

result = map_series(
    series,
    mapping,
    warn_on_missing_mappings=True,  # Warns about "france"
    warn_on_unused_mappings=True    # Warns if mapping has unused keys
)

Source code in lib/datautils/owid/datautils/dataframes.py
def map_series(
    series: pd.Series,
    mapping: dict[Any, Any],
    make_unmapped_values_nan: bool = False,
    warn_on_missing_mappings: bool = False,
    warn_on_unused_mappings: bool = False,
    show_full_warning: bool = False,
) -> pd.Series:
    """Map Series values with performance optimization and flexible NaN handling.

    Enhanced version of `pandas.Series.map()` that:

    - Preserves unmapped values instead of converting to NaN (optional)
    - Much faster than `Series.replace()` for large DataFrames
    - Supports categorical Series with automatic category management
    - Provides warnings for missing or unused mappings

    Behavior differences from `pandas.Series.map()`:

        - Default: unmapped values keep original values (not NaN)
        - With `make_unmapped_values_nan=True`: same as `Series.map()`

    Args:
        series: Series to map values from.
        mapping: Dictionary mapping old values to new values.
        make_unmapped_values_nan: If True, unmapped values become NaN.
            If False, they retain original values.
        warn_on_missing_mappings: If True, warn about values in Series
            that don't exist in mapping.
        warn_on_unused_mappings: If True, warn about mapping entries
            not used by any value in Series.
        show_full_warning: If True, print full list of missing/unused
            values in warnings.

    Returns:
        Series with mapped values.

    Example:
        Basic mapping
        ```python
        import pandas as pd
        from owid.datautils.dataframes import map_series

        series = pd.Series(["usa", "uk", "france"])
        mapping = {"usa": "United States", "uk": "United Kingdom"}

        # Default: unmapped values preserved
        result = map_series(series, mapping)
        # ["United States", "United Kingdom", "france"]

        # With NaN for unmapped
        result = map_series(series, mapping, make_unmapped_values_nan=True)
        # ["United States", "United Kingdom", NaN]
        ```

        With warnings
        ```python
        result = map_series(
            series,
            mapping,
            warn_on_missing_mappings=True,  # Warns about "france"
            warn_on_unused_mappings=True    # Warns if mapping has unused keys
        )
        ```
    """
    # If given category, only map category names and return category type.
    if series.dtype == "category":
        # Remove unused categories in input series.
        series = series.cat.remove_unused_categories()

        new_categories = map_series(
            pd.Series(series.cat.categories),
            mapping=mapping,
            make_unmapped_values_nan=make_unmapped_values_nan,
            warn_on_missing_mappings=warn_on_missing_mappings,
            warn_on_unused_mappings=warn_on_unused_mappings,
            show_full_warning=show_full_warning,
        )
        category_mapping = dict(zip(series.cat.categories, new_categories))
        return rename_categories(series, category_mapping)

    # Translate values in series following the mapping.
    series_mapped = series.map(mapping)
    if not make_unmapped_values_nan:
        # Rows that had values that were not in the mapping are now nan.
        # Replace those nans with their original values, except if they were actually meant to be mapped to nan.
        # For example, if {"bad_value": np.nan} was part of the mapping, do not replace those nans back to "bad_value".

        # if we are setting values from the original series, ensure we have the same dtype
        try:
            series_mapped = series_mapped.astype(series.dtype, copy=False)
        except ValueError:
            # casting NaNs to integer will fail
            pass

        # Detect values in the mapping that were intended to be mapped to nan.
        values_mapped_to_nan = [
            original_value for original_value, target_value in mapping.items() if pd.isnull(target_value)
        ]

        # Make a mask that is True for new nans that need to be replaced back to their original values.
        missing = series_mapped.isnull() & (~series.isin(values_mapped_to_nan))
        if missing.any():
            # Replace those nans by their original values.
            series_mapped.loc[missing] = series[missing]  # ty: ignore[call-non-callable]

    if warn_on_missing_mappings:
        unmapped = set(series) - set(mapping)
        if len(unmapped) > 0:
            warn_on_list_of_entities(
                unmapped,
                f"{len(unmapped)} missing values in mapping.",
                show_list=show_full_warning,
            )

    if warn_on_unused_mappings:
        unused = set(mapping) - set(series)
        if len(unused) > 0:
            warn_on_list_of_entities(
                unused,
                f"{len(unused)} unused values in mapping.",
                show_list=show_full_warning,
            )

    return series_mapped

multi_merge

multi_merge(
    dfs: list[DataFrame],
    on: list[str] | str,
    how: str = "inner",
) -> DataFrame

Merge multiple DataFrames on common columns.

Convenience function for merging more than two DataFrames sequentially. Equivalent to chaining multiple pd.merge() calls.

Parameters:

  • dfs (list[DataFrame]) –

    List of DataFrames to merge.

  • on (list[str] | str) –

    Column name(s) to merge on. Must exist in all DataFrames with the same name.

  • how (str, default: 'inner' ) –

    Type of merge to perform. Options: 'inner', 'outer', 'left', 'right'. Default is 'inner'.

Returns:

  • DataFrame

    Merged DataFrame containing all input DataFrames joined on specified columns.

Example
import pandas as pd
from owid.datautils.dataframes import multi_merge

df1 = pd.DataFrame({"country": ["USA", "UK"], "gdp": [20, 3]})
df2 = pd.DataFrame({"country": ["USA", "UK"], "pop": [330, 67]})
df3 = pd.DataFrame({"country": ["USA", "UK"], "area": [9.8, 0.24]})

result = multi_merge([df1, df2, df3], on="country")
#   country  gdp  pop  area
# 0     USA   20  330  9.80
# 1      UK    3   67  0.24
Source code in lib/datautils/owid/datautils/dataframes.py
def multi_merge(dfs: list[pd.DataFrame], on: list[str] | str, how: str = "inner") -> pd.DataFrame:
    """Merge multiple DataFrames on common columns.

    Convenience function for merging more than two DataFrames sequentially.
    Equivalent to chaining multiple `pd.merge()` calls.

    Args:
        dfs: List of DataFrames to merge.
        on: Column name(s) to merge on. Must exist in all DataFrames with
            the same name.
        how: Type of merge to perform. Options: 'inner', 'outer', 'left', 'right'.
            Default is 'inner'.

    Returns:
        Merged DataFrame containing all input DataFrames joined on specified columns.

    Example:
        ```python
        import pandas as pd
        from owid.datautils.dataframes import multi_merge

        df1 = pd.DataFrame({"country": ["USA", "UK"], "gdp": [20, 3]})
        df2 = pd.DataFrame({"country": ["USA", "UK"], "pop": [330, 67]})
        df3 = pd.DataFrame({"country": ["USA", "UK"], "area": [9.8, 0.24]})

        result = multi_merge([df1, df2, df3], on="country")
        #   country  gdp  pop  area
        # 0     USA   20  330  9.80
        # 1      UK    3   67  0.24
        ```
    """
    merged = dfs[0].copy()
    for df in dfs[1:]:
        merged = pd.merge(merged, df, how=how, on=on)  # ty: ignore

    return merged

rename_categories

rename_categories(
    series: Series, mapping: dict[Any, Any]
) -> Series

Alternative to pd.Series.cat.rename_categories which supports non-unique categories.

We do that by replacing non-unique categories first and then mapping with unique categories. Unused categories are removed during the process. It should be as fast as pd.Series.cat.rename_categories if there are no non-unique categories.

Source code in lib/datautils/owid/datautils/dataframes.py
def rename_categories(series: pd.Series, mapping: dict[Any, Any]) -> pd.Series:
    """Alternative to pd.Series.cat.rename_categories which supports non-unique categories.

    We do that by replacing non-unique categories first and then mapping with unique categories.
    Unused categories are removed during the process. It should be as fast as
    pd.Series.cat.rename_categories if there are no non-unique categories.
    """
    if series.dtype != "category":
        raise ValueError("Series must be of type category.")

    series = series.copy()

    new_mapping: dict[Any, Any] = {}
    for map_from, map_to in mapping.items():
        # Map nulls right away
        if pd.isnull(map_to):
            series[series == map_from] = np.nan

        # Non-unique category, replace it first
        elif map_to in new_mapping.values():
            # Find the category that maps to map_to
            series[series == map_from] = [k for k, v in new_mapping.items() if v == map_to][0]
        else:
            new_mapping[map_from] = map_to

    # NOTE: removing unused categories is necessary because of renaming
    return cast(
        pd.Series,
        series.cat.remove_unused_categories().cat.rename_categories(new_mapping).cat.remove_unused_categories(),
    )

to_file

to_file(*args: Any, **kwargs: Any) -> None

Save a dataframe in any format.

Will be deprecated. Use owid.datautils.io.df.to_file instead.

Source code in lib/datautils/owid/datautils/dataframes.py
def to_file(*args: Any, **kwargs: Any) -> None:
    """Save a dataframe in any format.

    Will be deprecated. Use owid.datautils.io.df.to_file instead.
    """
    warnings.warn(
        "Call to deprecated class to_file (This function will be removed in the next"
        " minor update, use owid.datautils.io.df_to_file instead.)",
        category=DeprecationWarning,
        stacklevel=2,
    )
    to_file_(*args, **kwargs)

owid.datautils.io

Input/Output methods.

Modules:

  • archive

    Input/Output functions for local files.

  • df

    DataFrame io operations.

  • json

    Input/Output functions for local files.

Functions:

  • decompress_file

    Extract a zip or tar file.

  • df_from_file

    Load a file as a pandas DataFrame with URL and compression support.

  • df_to_file

    Save DataFrame to file with automatic format detection and smart defaults.

  • load_json

    Load data from JSON file with optional duplicate key detection.

  • save_json

    Save data to a JSON file.

decompress_file

decompress_file(
    input_file: str | Path,
    output_folder: str | Path,
    overwrite: bool = False,
) -> None

Extract a zip or tar file.

It can be a local or a remote file.

Parameters:

  • input_file (str | Path) –

    Path to local zip file, or URL of a remote zip file.

  • output_folder (str | Path) –

    Path to local folder.

  • overwrite (bool, default: False ) –

    Overwrite decompressed content if it already exists (otherwise raises an error if content already exists).

Source code in lib/datautils/owid/datautils/io/archive.py
@enable_file_download(path_arg_name="input_file")
def decompress_file(
    input_file: str | Path,
    output_folder: str | Path,
    overwrite: bool = False,
) -> None:
    """Extract a zip or tar file.

    It can be a local or a remote file.

    Args:
        input_file: Path to local zip file, or URL of a remote zip file.
        output_folder: Path to local folder.
        overwrite: Overwrite decompressed content if it already exists (otherwise raises an error if content already exists).
    """
    if isinstance(input_file, str):
        input_file = Path(input_file)
    input_file = cast(Path, input_file)

    if zipfile.is_zipfile(input_file):
        _decompress_zip_file(input_file, output_folder, overwrite)
    elif tarfile.is_tarfile(input_file):
        _decompress_tar_file(input_file, output_folder, overwrite)
    elif input_file.suffix.lower() == ".7z":
        with SevenZipFile(input_file, mode="r") as z:
            z.extractall(path=output_folder)
    else:
        raise ValueError("File is neither a zip nor a tar file.")

df_from_file

df_from_file(
    file_path: str | Path,
    file_type: str | None = None,
    **kwargs: Any,
) -> DataFrame | list[DataFrame]

Load a file as a pandas DataFrame with URL and compression support.

Enhanced wrapper around pandas read_* functions that adds: - Automatic format detection from file extension - URL download support (via @enable_file_download decorator) - Compressed file reading (with explicit file_type)

The function infers the file type from the extension after the last dot. For example: "file.csv" reads as CSV, "https://example.com/data.xlsx" reads as Excel.

Parameters:

  • file_path (str | Path) –

    Local path or URL to the file. Supports local files and HTTP(S) URLs.

  • file_type (str | None, default: None ) –

    Explicit file type when reading compressed files (e.g., "csv", "dta", "json"). Only needed for compressed files. Specifies the format of the compressed content, not the compression format itself.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the underlying pandas.read_* function.

Returns:

  • DataFrame | list[DataFrame]

    DataFrame loaded from the file. Some formats (like HTML) may return a list of DataFrames.

Raises:

  • ValueError

    If file extension is unknown or file_type not provided for compressed files.

  • FileNotFoundError

    If the file path doesn't exist.

Example

Load from local file

from owid.datautils.io.df import from_file

# CSV file
df = from_file("data.csv")

# Excel with specific sheet
df = from_file("data.xlsx", sheet_name="Sheet1")

Load from URL

# HTTP URL (handled automatically by decorator)
df = from_file("https://example.com/data.csv")

Load compressed file

# Compressed CSV (must specify file_type)
df = from_file("data.csv.gz", file_type="csv")

Note

Supported formats: csv, dta, feather, hdf, html, json, parquet, pickle, xlsx, xml. Compression formats: gz, bz2, zip, xz, zst, tar (with explicit file_type).

Source code in lib/datautils/owid/datautils/io/df.py
@enable_file_download("file_path")
def from_file(file_path: str | Path, file_type: str | None = None, **kwargs: Any) -> pd.DataFrame | list[pd.DataFrame]:
    """Load a file as a pandas DataFrame with URL and compression support.

    Enhanced wrapper around pandas `read_*` functions that adds:
    - Automatic format detection from file extension
    - URL download support (via `@enable_file_download` decorator)
    - Compressed file reading (with explicit `file_type`)

    The function infers the file type from the extension after the last dot.
    For example: "file.csv" reads as CSV, "https://example.com/data.xlsx" reads as Excel.

    Args:
        file_path: Local path or URL to the file. Supports local files and HTTP(S) URLs.
        file_type: Explicit file type when reading compressed files (e.g., "csv", "dta", "json").
            Only needed for compressed files. Specifies the format of the compressed content,
            not the compression format itself.
        **kwargs: Additional arguments passed to the underlying `pandas.read_*` function.

    Returns:
        DataFrame loaded from the file. Some formats (like HTML) may return a list of DataFrames.

    Raises:
        ValueError: If file extension is unknown or `file_type` not provided for compressed files.
        FileNotFoundError: If the file path doesn't exist.

    Example:
        Load from local file
        ```python
        from owid.datautils.io.df import from_file

        # CSV file
        df = from_file("data.csv")

        # Excel with specific sheet
        df = from_file("data.xlsx", sheet_name="Sheet1")
        ```

        Load from URL
        ```python
        # HTTP URL (handled automatically by decorator)
        df = from_file("https://example.com/data.csv")
        ```

        Load compressed file
        ```python
        # Compressed CSV (must specify file_type)
        df = from_file("data.csv.gz", file_type="csv")
        ```

    Note:
        Supported formats: csv, dta, feather, hdf, html, json, parquet, pickle, xlsx, xml.
        Compression formats: gz, bz2, zip, xz, zst, tar (with explicit `file_type`).
    """
    # Ensure file_path is a Path object.
    file_path = Path(file_path)

    # Ensure extension is lower case and does not start with '.'.
    extension = file_path.suffix.lstrip(".").lower()

    # If compressed file, raise an exception unless file_type is given
    if extension in COMPRESSION_SUPPORTED:
        if file_type:
            extension = file_type
        else:
            raise ValueError("To be able to read from a compressed file, you need to provide a value for `file_type`.")

    # Check path is valid
    if not file_path.exists():
        raise FileNotFoundError(f"Cannot find file: {file_path}")

    # Available input methods (some of them may need additional dependencies to work).
    input_methods = {
        "csv": pd.read_csv,
        "dta": pd.read_stata,
        "feather": pd.read_feather,
        "hdf": pd.read_hdf,
        "html": pd.read_html,
        "json": pd.read_json,
        "parquet": pd.read_parquet,
        "pickle": pd.read_pickle,
        "pkl": pd.read_pickle,
        "xlsx": pd.read_excel,
        "xml": pd.read_xml,
    }
    if extension not in input_methods:
        raise ValueError(f"Failed reading dataframe because of an unknown file extension: {extension}")
    # Select the appropriate reading method.
    read_function = input_methods[extension]

    # Load file using the chosen read function and the appropriate arguments.
    df: pd.DataFrame = read_function(file_path, **kwargs)  # ty: ignore
    return df

df_to_file

df_to_file(
    df: DataFrame,
    file_path: str | Path,
    overwrite: bool = True,
    **kwargs: Any,
) -> None

Save DataFrame to file with automatic format detection and smart defaults.

Enhanced wrapper around pandas to_* methods that provides:

  • Automatic format selection from file extension
  • Auto-creation of parent directories
  • Intelligent index handling (omits dummy indices)
  • Optional overwrite protection

Format is determined by file extension: "data.csv" creates CSV, "data.parquet" creates Parquet, etc.

Parameters:

  • df (DataFrame) –

    DataFrame to save.

  • file_path (str | Path) –

    Output file path. Parent directories are created if needed.

  • overwrite (bool, default: True ) –

    If True, overwrite existing files. If False, raise error if file already exists.

  • **kwargs (Any, default: {} ) –

    Additional arguments passed to the underlying pandas.to_* method (e.g., na_rep, sep, compression).

Raises:

Example

Basic usage

from owid.datautils.io.df import to_file
import pandas as pd

df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})

# Save as CSV
to_file(df, "output.csv")

# Save as Parquet
to_file(df, "output.parquet")

# Save with custom parameters
to_file(df, "output.csv", na_rep="N/A", sep=";")

Auto-create directories

# Creates nested/path/ if it doesn't exist
to_file(df, "nested/path/data.csv")

Overwrite protection

# Raises FileExistsError if file exists
to_file(df, "existing.csv", overwrite=False)

Note

Supported formats: csv, dta, feather, hdf, html, json, md, parquet, pickle, tex, txt, xlsx, xml.

Index handling: Automatically omits dummy indices (default integer index) but preserves meaningful indices. Override with index=True/False in kwargs.

Source code in lib/datautils/owid/datautils/io/df.py
def to_file(df: pd.DataFrame, file_path: str | Path, overwrite: bool = True, **kwargs: Any) -> None:
    """Save DataFrame to file with automatic format detection and smart defaults.

    Enhanced wrapper around pandas `to_*` methods that provides:

    - Automatic format selection from file extension
    - Auto-creation of parent directories
    - Intelligent index handling (omits dummy indices)
    - Optional overwrite protection

    Format is determined by file extension: "data.csv" creates CSV,
    "data.parquet" creates Parquet, etc.

    Args:
        df: DataFrame to save.
        file_path: Output file path. Parent directories are created if needed.
        overwrite: If True, overwrite existing files. If False, raise error
            if file already exists.
        **kwargs: Additional arguments passed to the underlying `pandas.to_*` method
            (e.g., `na_rep`, `sep`, `compression`).

    Raises:
        ValueError: If file extension is not supported.
        FileExistsError: If file exists and `overwrite=False`.

    Example:
        Basic usage
        ```python
        from owid.datautils.io.df import to_file
        import pandas as pd

        df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})

        # Save as CSV
        to_file(df, "output.csv")

        # Save as Parquet
        to_file(df, "output.parquet")

        # Save with custom parameters
        to_file(df, "output.csv", na_rep="N/A", sep=";")
        ```

        Auto-create directories
        ```python
        # Creates nested/path/ if it doesn't exist
        to_file(df, "nested/path/data.csv")
        ```

        Overwrite protection
        ```python
        # Raises FileExistsError if file exists
        to_file(df, "existing.csv", overwrite=False)
        ```

    Note:
        Supported formats: csv, dta, feather, hdf, html, json, md, parquet,
        pickle, tex, txt, xlsx, xml.

        Index handling: Automatically omits dummy indices (default integer index)
        but preserves meaningful indices. Override with `index=True/False` in kwargs.
    """
    # Ensure file_path is a Path object.
    file_path = Path(file_path)

    # Ensure extension is lower case and does not start with '.'.
    extension = file_path.suffix.lstrip(".").lower()

    # Ensure output directory exists.
    if not file_path.parent.exists():
        file_path.parent.mkdir(parents=True)

    # Avoid overwriting an existing file unless explicitly stated.
    if file_path.is_file() and not overwrite:
        raise FileExistsError("Failed to save dataframe because file exists and 'overwrite' is False.")

    # Available output methods (some of them may need additional dependencies to work).
    output_methods = {
        "csv": df.to_csv,
        "dta": df.to_stata,
        "feather": df.to_feather,
        "hdf": df.to_hdf,
        "html": df.to_html,
        "json": df.to_json,
        "md": df.to_markdown,
        "parquet": df.to_parquet,
        "pickle": df.to_pickle,
        "pkl": df.to_pickle,
        "tex": df.to_latex,
        "txt": df.to_string,
        "xlsx": df.to_excel,
        "xml": df.to_xml,
    }
    if extension not in output_methods:
        raise ValueError(f"Failed saving dataframe because of an unknown file extension: {extension}")
    # Select the appropriate storing method.
    save_function = output_methods[extension]

    # Decide whether dataframe should be stored with or without an index, if:
    # * The storing method allows for an 'index' argument.
    # * The argument "index" is not explicitly given.
    if ("index" in inspect.signature(save_function).parameters) and ("index" not in kwargs):
        # Make 'index' False to avoid storing index if dataframe has a dummy index.
        kwargs["index"] = _has_index(df=df)

    # Save file using the chosen save function and the appropriate arguments.
    save_function(file_path, **kwargs)

load_json

load_json(
    json_file: str | Path,
    warn_on_duplicated_keys: bool = True,
) -> Any

Load data from JSON file with optional duplicate key detection.

If the JSON file contains duplicated keys, a warning is optionally raised, and only the value of the latest duplicated key is kept.

Parameters:

  • json_file (str | Path) –

    Path to JSON file. Supports local files and URLs (via decorator).

  • warn_on_duplicated_keys (bool, default: True ) –

    If True, warn about duplicate keys in JSON file.

Returns:

  • Any

    Data loaded from JSON file (typically a dict or list).

Example
from owid.datautils.io.json import load_json

# Load JSON file
data = load_json("data.json")

# Disable duplicate key warnings
data = load_json("data.json", warn_on_duplicated_keys=False)
Source code in lib/datautils/owid/datautils/io/json.py
@enable_file_download(path_arg_name="json_file")
def load_json(json_file: str | Path, warn_on_duplicated_keys: bool = True) -> Any:
    """Load data from JSON file with optional duplicate key detection.

    If the JSON file contains duplicated keys, a warning is optionally raised,
    and only the value of the latest duplicated key is kept.

    Args:
        json_file: Path to JSON file. Supports local files and URLs (via decorator).
        warn_on_duplicated_keys: If True, warn about duplicate keys in JSON file.

    Returns:
        Data loaded from JSON file (typically a dict or list).

    Example:
        ```python
        from owid.datautils.io.json import load_json

        # Load JSON file
        data = load_json("data.json")

        # Disable duplicate key warnings
        data = load_json("data.json", warn_on_duplicated_keys=False)
        ```
    """
    with open(json_file) as _json_file:
        json_content = _json_file.read()
        if warn_on_duplicated_keys:
            data = json.loads(json_content, object_pairs_hook=_load_json_data_and_duplicated_keys)
        else:
            data = json.loads(json_content)

    return data

save_json

save_json(
    data: Any, json_file: str | Path, **kwargs: Any
) -> None

Save data to a JSON file.

Parameters:

  • data (Any) –

    Data to be stored in JSON file (typically a dict or list).

  • json_file (str | Path) –

    Path to output JSON file. Parent directories are created if needed.

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments for json.dump() (e.g., indent=4, sort_keys=True).

Example
from owid.datautils.io.json import save_json

data = {"name": "John", "age": 30}

# Save JSON file
save_json(data, "output.json")

# Save with formatting
save_json(data, "output.json", indent=4, sort_keys=True)
Source code in lib/datautils/owid/datautils/io/json.py
def save_json(data: Any, json_file: str | Path, **kwargs: Any) -> None:
    """Save data to a JSON file.

    Args:
        data: Data to be stored in JSON file (typically a dict or list).
        json_file: Path to output JSON file. Parent directories are created if needed.
        **kwargs: Additional keyword arguments for `json.dump()` (e.g., `indent=4`, `sort_keys=True`).

    Example:
        ```python
        from owid.datautils.io.json import save_json

        data = {"name": "John", "age": 30}

        # Save JSON file
        save_json(data, "output.json")

        # Save with formatting
        save_json(data, "output.json", indent=4, sort_keys=True)
        ```
    """
    # Ensure json_file is a path.
    json_file = Path(json_file)

    # Ensure output directory exists.
    json_file.parent.mkdir(parents=True, exist_ok=True)

    with open(json_file, "w") as _json_file:
        json.dump(data, _json_file, **kwargs)

owid.datautils.google

Google utils.

Modules:

  • api

    Google API class.

  • config

    Google configuration functions.

  • sheets

    Google Sheet utils.

Classes:

GoogleApi

GoogleApi(clients_secrets_file: str | None = None)

API for Google Drive.

Initialise Google API.

To obtain client_secrets_file, follow the instructions from: https://medium.com/analytics-vidhya/how-to-connect-google-drive-to-python-using-pydrive-9681b2a14f20

Note
  • Additionally, make sure to add yourself in Test users, as noted in: https://stackoverflow.com/questions/65980758/pydrive-quickstart-and-error-403-access-denied
  • Select Desktop App instead of Web Application as the application type.

Parameters:

  • clients_secrets_file (str | None, default: None ) –

    Path to client_secrets file.

Example

First time calling the function should look similar to:

from owid.datautils.google.api import GoogleApi
api = GoogleApi("path/to/credentials.json")

New calls can then be made as follows:

api = GoogleApi()

Methods:

Attributes:

  • drive (GoogleDrive) –

    Google Drive object.

Source code in lib/datautils/owid/datautils/google/api.py
def __init__(self, clients_secrets_file: str | None = None) -> None:
    """Initialise Google API.

    To obtain `client_secrets_file`, follow the instructions from:
    https://medium.com/analytics-vidhya/how-to-connect-google-drive-to-python-using-pydrive-9681b2a14f20

    Note:
        - Additionally, make sure to add yourself in Test users, as noted in:
          https://stackoverflow.com/questions/65980758/pydrive-quickstart-and-error-403-access-denied
        - Select Desktop App instead of Web Application as the application type.

    Args:
        clients_secrets_file: Path to client_secrets file.

    Example:
        First time calling the function should look similar to:

        ```python
        from owid.datautils.google.api import GoogleApi
        api = GoogleApi("path/to/credentials.json")
        ```

        New calls can then be made as follows:

        ```python
        api = GoogleApi()
        ```
    """
    if not is_google_config_init():
        if not clients_secrets_file:
            # No clients_secrets, can't initialize Google configuration!
            raise ValueError("No value for `clients_secrets_file` was provided!")
        else:
            google_config_init(clients_secrets_file)

    self.sheets = GSheetsApi(CLIENT_SECRETS_PATH, CREDENTIALS_PATH)

drive property

drive: GoogleDrive

Google Drive object.

download_file classmethod

download_file(
    output: str,
    url: str | None = None,
    file_id: str | None = None,
    quiet: bool = True,
    **kwargs: Any,
) -> None

Download a file from Google Drive.

The file must be public, otherwise this function won't work.

Parameters:

  • output (str) –

    Local path to save the downloaded file.

  • url (str | None, default: None ) –

    URL to the file on Google Drive (it must be public), by default None

  • file_id (str | None, default: None ) –

    ID of the file on Google Drive (the file must be public), by default None.

  • quiet (bool, default: True ) –

    Suppress terminal output. Default is False.

Raises:

  • ValueError

    If neither url nor id are provided.

Source code in lib/datautils/owid/datautils/google/api.py
@classmethod
def download_file(
    cls,
    output: str,
    url: str | None = None,
    file_id: str | None = None,
    quiet: bool = True,
    **kwargs: Any,
) -> None:
    """Download a file from Google Drive.

    The file must be public, otherwise this function won't work.

    Args:
        output: Local path to save the downloaded file.
        url: URL to the file on Google Drive (it must be public), by default None
        file_id: ID of the file on Google Drive (the file must be public), by default None.
        quiet: Suppress terminal output. Default is False.

    Raises:
        ValueError: If neither `url` nor `id` are provided.
    """
    if url:
        gdown.download(url=url, output=output, quiet=quiet, **kwargs)
    elif file_id:
        gdown.download(id=file_id, output=output, quiet=quiet, **kwargs)
    else:
        raise ValueError("You must provide a `url` or `file_id`")

download_folder classmethod

download_folder(
    url: str, output: str, quiet: bool = True, **kwargs: Any
) -> None

Download a folder from Google Drive.

The folderm must be public, otherwise this function won't work.

Parameters:

  • url (str) –

    URL to the folder on Google Drive (must be public).

  • output (str) –

    Local path to save the downloaded folder.

  • quiet (bool, default: True ) –

    Suppress terminal output. Default is False.

Source code in lib/datautils/owid/datautils/google/api.py
@classmethod
def download_folder(cls, url: str, output: str, quiet: bool = True, **kwargs: Any) -> None:
    """Download a folder from Google Drive.

    The folderm must be public, otherwise this function won't work.

    Args:
        url: URL to the folder on Google Drive (must be public).
        output: Local path to save the downloaded folder.
        quiet: Suppress terminal output. Default is False.
    """
    gdown.download_folder(url, output=output, quiet=quiet, use_cookies=False, **kwargs)

list_files

list_files(parent_id: str) -> GoogleDriveFileList

List files in a Google Drive folder.

Parameters:

  • parent_id (str) –

    Google Drive folder ID.

Returns:

  • GoogleDriveFileList

    List of files in the folder.

Source code in lib/datautils/owid/datautils/google/api.py
def list_files(self, parent_id: str) -> GoogleDriveFileList:
    """List files in a Google Drive folder.

    Args:
        parent_id: Google Drive folder ID.

    Returns:
        List of files in the folder.
    """
    request = f"'{parent_id}' in parents and trashed=false"
    # Get list of files
    files = self.drive.ListFile({"q": request}).GetList()
    return files  # ty: ignore[invalid-return-type]

owid.datautils.format

Utils for the processing of different data formats.

Modules:

  • numbers

    Numeric formatting.

Functions:

format_number

format_number(number: int | str) -> int

Format number string to integer.

Only supports integer conversion. Handles various formats including separators and numeric words.

Parameters:

  • number (int | str) –

    Input raw number to be formatted (int or str).

Returns:

  • int

    Formatted number as integer.

Example
from owid.datautils.format.numbers import format_number

# Number with separators
format_number('1 000 000')  # Returns: 1000000
format_number('2,000')      # Returns: 2000

# Number with words
format_number('1 million 1 hundred')  # Returns: 1000100
Source code in lib/datautils/owid/datautils/format/numbers.py
def format_number(number: int | str) -> int:
    """Format number string to integer.

    Only supports integer conversion. Handles various formats including
    separators and numeric words.

    Args:
        number: Input raw number to be formatted (int or str).

    Returns:
        Formatted number as integer.

    Example:
        ```python
        from owid.datautils.format.numbers import format_number

        # Number with separators
        format_number('1 000 000')  # Returns: 1000000
        format_number('2,000')      # Returns: 2000

        # Number with words
        format_number('1 million 1 hundred')  # Returns: 1000100
        ```
    """
    number_ = IntegerNumber(number)
    return number_.clean()

owid.datautils.decorators

Library decorators.

Functions:

  • enable_file_download

    Decorator that allows functions expecting local file paths to accept URLs and S3 paths.

enable_file_download

enable_file_download(
    path_arg_name: str | None = None,
) -> Callable[[Any], Any]

Decorator that allows functions expecting local file paths to accept URLs and S3 paths.

This decorator automatically downloads remote files to temporary storage before calling the decorated function, making any file-processing function work transparently with:

  • Local file paths (unchanged behavior)
  • HTTP/HTTPS URLs (downloaded via web request)
  • S3 paths (downloaded via S3 client)

Parameters:

  • path_arg_name (str | None, default: None ) –

    Name of the parameter containing the file path. If None, uses the first positional argument.

Example
@enable_file_download(path_arg_name="file_path")
def load_data(file_path):
    with open(file_path, 'r') as f:
        return f.read()

# Now works with all of these:
load_data("/local/file.txt")                    # Local file
load_data("https://example.com/data.txt")       # HTTP download
load_data("s3://bucket/data.txt")               # S3 download
Warning

Downloads entire files to temporary storage on every call. For large files or frequent access, consider explicit caching or streaming approaches.

Source code in lib/datautils/owid/datautils/decorators.py
def enable_file_download(path_arg_name: str | None = None) -> Callable[[Any], Any]:
    """Decorator that allows functions expecting local file paths to accept URLs and S3 paths.

    This decorator automatically downloads remote files to temporary storage before calling
    the decorated function, making any file-processing function work transparently with:

    - Local file paths (unchanged behavior)
    - HTTP/HTTPS URLs (downloaded via web request)
    - S3 paths (downloaded via S3 client)

    Args:
        path_arg_name: Name of the parameter containing the file path. If None,
                      uses the first positional argument.

    Example:
        ```python
        @enable_file_download(path_arg_name="file_path")
        def load_data(file_path):
            with open(file_path, 'r') as f:
                return f.read()

        # Now works with all of these:
        load_data("/local/file.txt")                    # Local file
        load_data("https://example.com/data.txt")       # HTTP download
        load_data("s3://bucket/data.txt")               # S3 download
        ```

    Warning:
        Downloads entire files to temporary storage on every call. For large files
        or frequent access, consider explicit caching or streaming approaches.
    """
    # Download options, add them as needed (value: str, key: Tuple[str])
    prefixes = {
        "url": (
            "http://",
            "https://",
        ),
        "s3": ("s3://",),
    }
    # Get list of prefixes as a flat tuple
    prefixes_flat = tuple(prefix for prefixes_list in prefixes.values() for prefix in prefixes_list)

    def _enable_file_download(func: Callable[[Any], Any]) -> Callable[[Any], Any]:
        @functools.wraps(func)
        def wrapper_download(*args: Any, **kwargs: Any) -> Any:
            # Get path to file
            _used_args = False
            if args:
                args = list(args)  # ty: ignore
                path = args[0]
                _used_args = True
            else:
                path = kwargs.get(path_arg_name)  # ty: ignore
                if path is None:
                    raise ValueError(f"Filename was not found in args or kwargs ({path_arg_name}!")
            # Check if download is needed and download
            path = str(path)
            if path.startswith(prefixes_flat):  # Download from URL and run function
                with tempfile.NamedTemporaryFile() as temp_file:
                    # Download file from URL
                    if path.startswith(prefixes["url"]):
                        download_file_from_url(path, temp_file.name)  # TODO: Add custom args here
                    # Download file from S3 (need credentials)
                    elif path.startswith(prefixes["s3"]):
                        try:
                            # Import here to avoid circular dependency
                            from owid.catalog import s3_utils  # ty: ignore
                        except ImportError as e:
                            raise ImportError("owid-catalog is required for S3 downloads. ") from e

                        s3_utils.download(path, temp_file.name, quiet=True)  # TODO: Add custom args here

                    # Modify args/kwargs
                    if _used_args:
                        args[0] = temp_file.name  # ty: ignore
                    else:
                        kwargs[path_arg_name] = temp_file.name  # ty: ignore
                    # Call function
                    return func(*args, **kwargs)
            else:  # Run function on local file
                return func(*args, **kwargs)

        return wrapper_download

    return _enable_file_download