Skip to content

DBT: Diet Trase Coffee 2020

File location: s3://trase-storage/diet-trase/diet-trase-results-2020.parquet

DBT model name: diet_trase_coffee_2020

Explore on Metabase: Full table; summary statistics

DBT details


Description

Results of the supply chain model for Diet Trase coffee in 2020. This dataset applies the output of the Diet Trase model to the consolidated trade data.


Details

Column Type Description
year BIGINT
country_of_production VARCHAR
country_of_production_iso2 VARCHAR
port_of_export_name VARCHAR
hs6 VARCHAR
exporter_name VARCHAR
exporter_node_id BIGINT
exporter_group VARCHAR
exporter_group_parent VARCHAR
importer_name VARCHAR
importer_group VARCHAR
country_of_first_import VARCHAR
country_of_first_import_iso2 VARCHAR
country_of_first_import_economic_bloc VARCHAR
is_padded BOOLEAN
padded_type VARCHAR
decision_tree_branch VARCHAR
linear_programming_status VARCHAR
linear_programming_failure_reason VARCHAR
is_domestic BOOLEAN
domestic_consumption_region_geocode_id VARCHAR
domestic_consumption_region_name VARCHAR
domestic_consumption_region_level INTEGER
production_geocode_id VARCHAR
production_geocode_name VARCHAR
production_geocode_level INTEGER
port_of_export_label VARCHAR
exporter_label VARCHAR
importer_label VARCHAR
mass_tonnes DOUBLE
mass_tonnes_raw_equivalent DOUBLE
fob DOUBLE
port_country_proportion DOUBLE

Models / Seeds

  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.diet_trase_subnational_regions
  • model.trase_duckdb.diet_trase_coffee_trade_padded_2020
  • model.trase_duckdb.diet_trase_coffee_2020_brazil
  • model.trase_duckdb.diet_trase_coffee_2020_tanzania
  • model.trase_duckdb.diet_trase_coffee_2020_other

No called script or script source not found.

import pandas as pd
import polars as pl

from pandas.testing import assert_frame_equal

from trase.models.diet_trase.coffee_fullmodel.constants import UNKNOWN_REGION
from trase.tools import sps
from trase.tools.sei_pcs.pandas_utilities import split_dataframe_using_proportions


def enrichment_join(left, right, on):
    df = pd.merge(
        left, right, validate="many_to_one", on=on, how="left", indicator=True
    )
    missing = df[df["_merge"] != "both"]
    if not missing.empty:
        missing_values = missing[on].drop_duplicates()
        print(
            f"Warning: Missing values in enrichment join for column '{on}': {missing_values}"
        )
    return df.drop(columns=["_merge"])


def assert_equal_after_consolidation(df1, df2, numerical_columns, categorical_columns):
    c = sps.consolidate(df1, numerical_columns, categorical_columns)
    d = sps.consolidate(df2, numerical_columns, categorical_columns)
    assert_frame_equal(c, d, check_like=True)


def select_and_reorder_columns(df):
    columns = [
        "year",
        "country_of_production_name",
        "country_of_production_iso2",
        "port_of_export_name",
        "hs6",
        "exporter_name",
        "exporter_node_id",
        "exporter_group_name",
        "exporter_group_parent",
        "importer_name",
        "importer_group",
        "country_of_destination",
        "country_of_destination_iso2",
        "country_of_destination_economic_bloc",
        "mass_tonnes",
        "mass_tonnes_raw_equivalent",
        "fob",
        "is_padded",
        "padded_type",
        "branch",
        "status",
        "linear_programming_failure_reason",
        "is_domestic",
        "domestic_consumption_region_geocode",
        "domestic_consumption_region_name",
        "domestic_consumption_region_level",
        "production_geocode",
        "production_geocode_name",
        "production_geocode_level",
        "proportion",
        "port_of_export_label",
        "exporter_label",
        "importer_label",
    ]

    df = df[columns]

    df = df.rename(
        columns={
            "exporter_group_name": "exporter_group",
            "country_of_production_name": "country_of_production",
            "country_of_destination": "country_of_first_import",
            "country_of_destination_iso2": "country_of_first_import_iso2",
            "country_of_destination_economic_bloc": "country_of_first_import_economic_bloc",
            "domestic_consumption_region_geocode": "domestic_consumption_region_geocode_id",
            "production_geocode": "production_geocode_id",
            "branch": "decision_tree_branch",
            "proportion": "port_country_proportion",
            "status": "linear_programming_status",
        },
        errors="raise",
    )

    # Take updated column names into account
    columns = df.columns.tolist()

    # Aggregate by mass_tonnes, mass_tonnes_raw_equivalent, fob, proportion
    df = df.groupby(
        [
            col
            for col in columns
            if col
            not in [
                "mass_tonnes",
                "mass_tonnes_raw_equivalent",
                "fob",
                "port_country_proportion",
            ]
        ],
        dropna=False,
        as_index=False,
    )[
        ["mass_tonnes", "mass_tonnes_raw_equivalent", "fob", "port_country_proportion"]
    ].sum(
        min_count=1
    )

    polars_df = pl.from_pandas(df)

    # Fix some data types that are ints instead of floats
    # do it in polars so ints with nulls don't get converted to floats
    polars_df = polars_df.with_columns(
        [
            pl.col("exporter_node_id").cast(pl.Int64, strict=True),
            pl.col("production_geocode_level").cast(pl.Int32, strict=True),
            pl.col("domestic_consumption_region_level").cast(pl.Int32, strict=True),
        ]
    )

    return polars_df


def model(dbt, cursor):
    dbt.config(
        materialized="external",
        depends_on=["postgres_countries"],
    )

    # -------------------------------------------------------------------------------- #
    # load country information
    # -------------------------------------------------------------------------------- #
    r = dbt.ref("postgres_countries")
    df_countries = r.df()[["country_name", "synonyms", "country_trase_id"]].reset_index(
        drop=True
    )

    # -------------------------------------------------------------------------------- #
    # load subnational regions
    # -------------------------------------------------------------------------------- #
    df_regions = dbt.ref("diet_trase_subnational_regions").df()
    df_regions = df_regions.rename(
        columns={
            "country": "country_of_production_name",
        },
        errors="raise",
    )
    df_regions["country_of_production_name"] = df_regions[
        "country_of_production_name"
    ].str.upper()

    # -------------------------------------------------------------------------------- #
    # load the input trade data and rename some columns
    # -------------------------------------------------------------------------------- #
    df_trade = dbt.ref("diet_trase_coffee_trade_padded_2020").df()
    df_trade = df_trade.rename(
        columns={
            "producing_country": "country_of_production_name",
            "economic_bloc": "country_of_destination_economic_bloc",
            "padded": "is_padded",
        },
        errors="raise",
    )

    # the model drops ST. VINCENT AND THE GRENADINES
    df_trade = df_trade[
        df_trade["country_of_production_name"] != "ST. VINCENT AND THE GRENADINES"
    ]

    # -------------------------------------------------------------------------------- #
    # load the model results
    # -------------------------------------------------------------------------------- #
    join_columns = [
        "year",
        "padded_type",
        "port_of_export_name",
        "country_of_production_name",
    ]
    columns_added_by_model = [
        "branch",
        "status",
        "linear_programming_failure_reason",
        "is_domestic",
        "domestic_consumption_region_geocode",
        "production_geocode",
    ]
    df: pd.DataFrame = sps.concat(
        [
            dbt.ref("diet_trase_coffee_2020_brazil").df(),
            dbt.ref("diet_trase_coffee_2020_tanzania").df(),
            dbt.ref("diet_trase_coffee_2020_other").df(),
        ]
    )
    df = df[["mass_tonnes_raw_equivalent", *join_columns, *columns_added_by_model]]
    df["padded_type"] = df["padded_type"].str.lower()  # TODO: fix upstream

    # add back in missing production geocode
    # TODO fix in model
    missing = df["production_geocode"] == ""
    df.loc[missing, "production_geocode"] = UNKNOWN_REGION

    # add names to geocodes added by the model
    unknowns = (
        df[["country_of_production_name"]]
        .drop_duplicates()
        .assign(geocode="XX", gadm_level=None, name="UNKNOWN")
    )
    df_regions_with_unknown = sps.concat([df_regions, unknowns], ignore_index=True)

    df = enrichment_join(
        df,
        df_regions_with_unknown.rename(
            columns={
                "geocode": "domestic_consumption_region_geocode",
                "name": "domestic_consumption_region_name",
                "gadm_level": "domestic_consumption_region_level",
            },
            errors="raise",
        ),
        on=["domestic_consumption_region_geocode", "country_of_production_name"],
    )
    df = enrichment_join(
        df,
        df_regions_with_unknown.rename(
            columns={
                "geocode": "production_geocode",
                "name": "production_geocode_name",
                "gadm_level": "production_geocode_level",
            },
            errors="raise",
        ),
        on=["production_geocode", "country_of_production_name"],
    )

    # split out domestic from trade
    is_domestic = df["is_domestic"]
    assert set(is_domestic.unique()) == {True, False}
    df_domestic_results = df[is_domestic].copy()
    df_trade_results = df[~is_domestic].copy()

    # compute proportions
    df_trade_results["proportion"] = sps.grouped_proportion(
        df_trade_results,
        "mass_tonnes_raw_equivalent",
        join_columns,
    )

    # -------------------------------------------------------------------------------- #
    # ensure that that trade results have not been altered
    # -------------------------------------------------------------------------------- #
    # make sure years are int32 for comparison
    df_trade["year"] = df_trade["year"].astype("int32")
    df_trade_results["year"] = df_trade_results["year"].astype("int32")
    assert_equal_after_consolidation(
        df_trade_results, df_trade, ["mass_tonnes_raw_equivalent"], join_columns
    )

    # -------------------------------------------------------------------------------- #
    # apply results to trade data
    # -------------------------------------------------------------------------------- #

    # split the trade data
    df_trade_splitted = split_dataframe_using_proportions(
        df_trade,
        df_trade_results.drop(columns=["mass_tonnes_raw_equivalent"], errors="raise"),
        values=["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"],
        on=join_columns,
        by="proportion",
        where=None,
        validate=None,
    )

    # validate that trade data has been preserved
    numerical_columns = ["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"]
    categorical_columns = [
        col for col in df_trade.columns if col not in numerical_columns
    ]
    categorical_columns.remove("exporter_group_parent")  # has NaNs
    assert_equal_after_consolidation(
        df_trade, df_trade_splitted, numerical_columns, categorical_columns
    )

    # append the domestic data
    df_domestic = df_domestic_results.assign(
        country_of_destination=df_domestic_results["country_of_production_name"],
        country_of_destination_economic_bloc=df_domestic_results[
            "country_of_production_name"
        ],
        exporter_group_name="UNKNOWN",
        exporter_group_parent=None,
        exporter_label="UNKNOWN",
        exporter_name="UNKNOWN",
        exporter_node_id=15221616,
        fob=None,  # FOB is only applicable for trade
        hs6=None,
        importer_group="UNKNOWN",
        importer_label="UNKNOWN",
        importer_name="UNKNOWN",
        mass_tonnes=None,  # HS6 is unknown means we don't know equivalence factors
        is_padded=None,
        padded_type=None,
        port_of_export_label="UNKNOWN",
        proportion=1.0,
    )
    df_final = sps.concat([df_domestic, df_trade_splitted])

    # Final clean of the country names, adding their iso2 codes
    df_countries_exploded = df_countries.explode("synonyms")

    df_final = enrichment_join(
        df_final,
        df_countries_exploded.rename(
            columns={
                "synonyms": "country_of_production_name",
                "country_name": "clean_country_of_production_name",
                "country_trase_id": "country_of_production_iso2",
            },
            errors="raise",
        ),
        on=["country_of_production_name"],
    )
    df_final = enrichment_join(
        df_final,
        df_countries_exploded.rename(
            columns={
                "synonyms": "country_of_destination",
                "country_name": "clean_country_of_destination",
                "country_trase_id": "country_of_destination_iso2",
            },
            errors="raise",
        ),
        on=["country_of_destination"],
    )
    df_final["country_of_production_name"] = df_final[
        "clean_country_of_production_name"
    ]
    df_final["country_of_destination"] = df_final["clean_country_of_destination"]
    df_final = df_final.drop(
        columns=[
            "clean_country_of_production_name",
            "clean_country_of_destination",
        ],
        errors="raise",
    )

    # select and reorder columns, and use polars to allow int types to have nulls
    polars_df_final = select_and_reorder_columns(df_final)

    return polars_df_final