Skip to content

DBT: Diet Trase Coffee 2020

File location: s3://trase-storage/diet-trase/diet-trase-results-2020.parquet

DBT model name: diet_trase_coffee_2020

Explore on Metabase: Full table; summary statistics

Explore dependencies/lineage: link


Description

Results of the supply chain model for Diet Trase coffee in 2020. This dataset applies the output of the Diet Trase model to the consolidated trade data.


Details

Column Type Description
year BIGINT
country_of_production VARCHAR
country_of_production_iso2 VARCHAR
port_of_export_name VARCHAR
hs6 VARCHAR
exporter_name VARCHAR
exporter_node_id BIGINT
exporter_group VARCHAR
exporter_group_parent VARCHAR
importer_name VARCHAR
importer_group VARCHAR
country_of_first_import VARCHAR
country_of_first_import_iso2 VARCHAR
country_of_first_import_economic_bloc VARCHAR
is_padded BOOLEAN
padded_type VARCHAR
decision_tree_branch VARCHAR
linear_programming_status VARCHAR
linear_programming_failure_reason VARCHAR
is_domestic BOOLEAN
domestic_consumption_region_geocode_id VARCHAR
domestic_consumption_region_name VARCHAR
domestic_consumption_region_level INTEGER
production_geocode_id VARCHAR
production_geocode_name VARCHAR
production_geocode_level INTEGER
port_of_export_label VARCHAR
exporter_label VARCHAR
importer_label VARCHAR
mass_tonnes DOUBLE
mass_tonnes_raw_equivalent DOUBLE
fob DOUBLE
port_country_proportion DOUBLE

Review full report including sample errors records if they exist (link)

Test name Test column Last test run Last status
accepted_values_diet_trase_coffee_2020_linear_programming_status__TO_RESULTS__LP_SUCCEEDED__LP_FAILED linear_programming_status 2026-04-25 13:23 pass
accepted_values_diet_trase_coffee_2020_year__2020 year 2026-04-25 13:23 pass
check_trader_groups_diet_trase_coffee_2020_exporter_group__2020 `` 2026-04-25 13:23 pass
constant_within_group_diet_trase_coffee_2020_decision_tree_branch__country_of_production_iso2 decision_tree_branch 2026-04-25 13:23 pass
constant_within_group_diet_trase_coffee_2020_linear_programming_status__country_of_production_iso2 linear_programming_status 2026-04-25 13:23 pass
dbt_utils_expression_is_true_diet_trase_coffee_2020_mass_tonnes___0 mass_tonnes 2026-04-25 13:23 pass
dbt_utils_expression_is_true_diet_trase_coffee_2020_mass_tonnes_raw_equivalent___mass_tonnes mass_tonnes_raw_equivalent 2026-04-25 13:23 pass
not_null_diet_trase_coffee_2020_country_of_production country_of_production 2026-04-25 13:23 pass
not_null_diet_trase_coffee_2020_mass_tonnes_raw_equivalent mass_tonnes_raw_equivalent 2026-04-25 13:23 pass
relationships_diet_trase_coffee_2020_country_of_production__country_name__ref_postgres_countries_ country_of_production 2026-04-25 13:23 pass

Models

Exposures

import pandas as pd
import polars as pl

from pandas.testing import assert_frame_equal

from trase.models.diet_trase.coffee_fullmodel.constants import UNKNOWN_REGION
from trase.tools import sps
from trase.tools.sei_pcs.pandas_utilities import split_dataframe_using_proportions


def enrichment_join(left, right, on):
    df = pd.merge(
        left, right, validate="many_to_one", on=on, how="left", indicator=True
    )
    missing = df[df["_merge"] != "both"]
    if not missing.empty:
        missing_values = missing[on].drop_duplicates()
        print(
            f"Warning: Missing values in enrichment join for column '{on}': {missing_values}"
        )
    return df.drop(columns=["_merge"])


def assert_equal_after_consolidation(df1, df2, numerical_columns, categorical_columns):
    c = sps.consolidate(df1, numerical_columns, categorical_columns)
    d = sps.consolidate(df2, numerical_columns, categorical_columns)
    assert_frame_equal(c, d, check_like=True)


def select_and_reorder_columns(df):
    columns = [
        "year",
        "country_of_production_name",
        "country_of_production_iso2",
        "port_of_export_name",
        "hs6",
        "exporter_name",
        "exporter_node_id",
        "exporter_group_name",
        "exporter_group_parent",
        "importer_name",
        "importer_group",
        "country_of_destination",
        "country_of_destination_iso2",
        "country_of_destination_economic_bloc",
        "mass_tonnes",
        "mass_tonnes_raw_equivalent",
        "fob",
        "is_padded",
        "padded_type",
        "branch",
        "status",
        "linear_programming_failure_reason",
        "is_domestic",
        "domestic_consumption_region_geocode",
        "domestic_consumption_region_name",
        "domestic_consumption_region_level",
        "production_geocode",
        "production_geocode_name",
        "production_geocode_level",
        "proportion",
        "port_of_export_label",
        "exporter_label",
        "importer_label",
    ]

    df = df[columns]

    df = df.rename(
        columns={
            "exporter_group_name": "exporter_group",
            "country_of_production_name": "country_of_production",
            "country_of_destination": "country_of_first_import",
            "country_of_destination_iso2": "country_of_first_import_iso2",
            "country_of_destination_economic_bloc": "country_of_first_import_economic_bloc",
            "domestic_consumption_region_geocode": "domestic_consumption_region_geocode_id",
            "production_geocode": "production_geocode_id",
            "branch": "decision_tree_branch",
            "proportion": "port_country_proportion",
            "status": "linear_programming_status",
        },
        errors="raise",
    )

    # Take updated column names into account
    columns = df.columns.tolist()

    # Aggregate by mass_tonnes, mass_tonnes_raw_equivalent, fob, proportion
    df = df.groupby(
        [
            col
            for col in columns
            if col
            not in [
                "mass_tonnes",
                "mass_tonnes_raw_equivalent",
                "fob",
                "port_country_proportion",
            ]
        ],
        dropna=False,
        as_index=False,
    )[
        ["mass_tonnes", "mass_tonnes_raw_equivalent", "fob", "port_country_proportion"]
    ].sum(
        min_count=1
    )

    polars_df = pl.from_pandas(df)

    # Fix some data types that are ints instead of floats
    # do it in polars so ints with nulls don't get converted to floats
    polars_df = polars_df.with_columns(
        [
            pl.col("exporter_node_id").cast(pl.Int64, strict=True),
            pl.col("production_geocode_level").cast(pl.Int32, strict=True),
            pl.col("domestic_consumption_region_level").cast(pl.Int32, strict=True),
        ]
    )

    return polars_df


def model(dbt, cursor):
    dbt.config(
        materialized="external",
        depends_on=["postgres_countries"],
    )

    # -------------------------------------------------------------------------------- #
    # load country information
    # -------------------------------------------------------------------------------- #
    r = dbt.ref("postgres_countries")
    df_countries = r.df()[["country_name", "synonyms", "country_trase_id"]].reset_index(
        drop=True
    )

    # -------------------------------------------------------------------------------- #
    # load subnational regions
    # -------------------------------------------------------------------------------- #
    df_regions = dbt.ref("diet_trase_subnational_regions").df()
    df_regions = df_regions.rename(
        columns={
            "country": "country_of_production_name",
        },
        errors="raise",
    )
    df_regions["country_of_production_name"] = df_regions[
        "country_of_production_name"
    ].str.upper()

    # -------------------------------------------------------------------------------- #
    # load the input trade data and rename some columns
    # -------------------------------------------------------------------------------- #
    df_trade = dbt.ref("diet_trase_coffee_trade_padded_2020").df()
    df_trade = df_trade.rename(
        columns={
            "producing_country": "country_of_production_name",
            "economic_bloc": "country_of_destination_economic_bloc",
            "padded": "is_padded",
        },
        errors="raise",
    )

    # the model drops ST. VINCENT AND THE GRENADINES
    df_trade = df_trade[
        df_trade["country_of_production_name"] != "ST. VINCENT AND THE GRENADINES"
    ]

    # -------------------------------------------------------------------------------- #
    # load the model results
    # -------------------------------------------------------------------------------- #
    join_columns = [
        "year",
        "padded_type",
        "port_of_export_name",
        "country_of_production_name",
    ]
    columns_added_by_model = [
        "branch",
        "status",
        "linear_programming_failure_reason",
        "is_domestic",
        "domestic_consumption_region_geocode",
        "production_geocode",
    ]
    df: pd.DataFrame = sps.concat(
        [
            dbt.ref("diet_trase_coffee_2020_brazil").df(),
            dbt.ref("diet_trase_coffee_2020_tanzania").df(),
            dbt.ref("diet_trase_coffee_2020_other").df(),
        ]
    )
    df = df[["mass_tonnes_raw_equivalent", *join_columns, *columns_added_by_model]]
    df["padded_type"] = df["padded_type"].str.lower()  # TODO: fix upstream

    # add back in missing production geocode
    # TODO fix in model
    missing = df["production_geocode"] == ""
    df.loc[missing, "production_geocode"] = UNKNOWN_REGION

    # add names to geocodes added by the model
    unknowns = (
        df[["country_of_production_name"]]
        .drop_duplicates()
        .assign(geocode="XX", gadm_level=None, name="UNKNOWN")
    )
    df_regions_with_unknown = sps.concat([df_regions, unknowns], ignore_index=True)

    df = enrichment_join(
        df,
        df_regions_with_unknown.rename(
            columns={
                "geocode": "domestic_consumption_region_geocode",
                "name": "domestic_consumption_region_name",
                "gadm_level": "domestic_consumption_region_level",
            },
            errors="raise",
        ),
        on=["domestic_consumption_region_geocode", "country_of_production_name"],
    )
    df = enrichment_join(
        df,
        df_regions_with_unknown.rename(
            columns={
                "geocode": "production_geocode",
                "name": "production_geocode_name",
                "gadm_level": "production_geocode_level",
            },
            errors="raise",
        ),
        on=["production_geocode", "country_of_production_name"],
    )

    # split out domestic from trade
    is_domestic = df["is_domestic"]
    assert set(is_domestic.unique()) == {True, False}
    df_domestic_results = df[is_domestic].copy()
    df_trade_results = df[~is_domestic].copy()

    # compute proportions
    df_trade_results["proportion"] = sps.grouped_proportion(
        df_trade_results,
        "mass_tonnes_raw_equivalent",
        join_columns,
    )

    # -------------------------------------------------------------------------------- #
    # ensure that that trade results have not been altered
    # -------------------------------------------------------------------------------- #
    # make sure years are int32 for comparison
    df_trade["year"] = df_trade["year"].astype("int32")
    df_trade_results["year"] = df_trade_results["year"].astype("int32")
    assert_equal_after_consolidation(
        df_trade_results, df_trade, ["mass_tonnes_raw_equivalent"], join_columns
    )

    # -------------------------------------------------------------------------------- #
    # apply results to trade data
    # -------------------------------------------------------------------------------- #

    # split the trade data
    df_trade_splitted = split_dataframe_using_proportions(
        df_trade,
        df_trade_results.drop(columns=["mass_tonnes_raw_equivalent"], errors="raise"),
        values=["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"],
        on=join_columns,
        by="proportion",
        where=None,
        validate=None,
    )

    # validate that trade data has been preserved
    numerical_columns = ["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"]
    categorical_columns = [
        col for col in df_trade.columns if col not in numerical_columns
    ]
    categorical_columns.remove("exporter_group_parent")  # has NaNs
    assert_equal_after_consolidation(
        df_trade, df_trade_splitted, numerical_columns, categorical_columns
    )

    # append the domestic data
    df_domestic = df_domestic_results.assign(
        country_of_destination=df_domestic_results["country_of_production_name"],
        country_of_destination_economic_bloc=df_domestic_results[
            "country_of_production_name"
        ],
        exporter_group_name="UNKNOWN",
        exporter_group_parent=None,
        exporter_label="UNKNOWN",
        exporter_name="UNKNOWN",
        exporter_node_id=15221616,
        fob=None,  # FOB is only applicable for trade
        hs6=None,
        importer_group="UNKNOWN",
        importer_label="UNKNOWN",
        importer_name="UNKNOWN",
        mass_tonnes=None,  # HS6 is unknown means we don't know equivalence factors
        is_padded=None,
        padded_type=None,
        port_of_export_label="UNKNOWN",
        proportion=1.0,
    )
    df_final = sps.concat([df_domestic, df_trade_splitted])

    # Final clean of the country names, adding their iso2 codes
    df_countries_exploded = df_countries.explode("synonyms")

    df_final = enrichment_join(
        df_final,
        df_countries_exploded.rename(
            columns={
                "synonyms": "country_of_production_name",
                "country_name": "clean_country_of_production_name",
                "country_trase_id": "country_of_production_iso2",
            },
            errors="raise",
        ),
        on=["country_of_production_name"],
    )
    df_final = enrichment_join(
        df_final,
        df_countries_exploded.rename(
            columns={
                "synonyms": "country_of_destination",
                "country_name": "clean_country_of_destination",
                "country_trase_id": "country_of_destination_iso2",
            },
            errors="raise",
        ),
        on=["country_of_destination"],
    )
    df_final["country_of_production_name"] = df_final[
        "clean_country_of_production_name"
    ]
    df_final["country_of_destination"] = df_final["clean_country_of_destination"]
    df_final = df_final.drop(
        columns=[
            "clean_country_of_production_name",
            "clean_country_of_destination",
        ],
        errors="raise",
    )

    # select and reorder columns, and use polars to allow int types to have nulls
    polars_df_final = select_and_reorder_columns(df_final)

    return polars_df_final