Skip to content

Diet Trase Coffee 2020

s3://trase-storage/diet-trase/diet-trase-results-2020.parquet

Dbt path: trase_production.main.diet_trase_coffee_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/diet_trase/_schema.yml

Model file link: trase/data_pipeline/models/diet_trase/diet_trase_coffee_2020.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: gold, coffee, diet-trase-coffee


diet_trase_coffee_2020

Description

Results of the supply chain model for Diet Trase coffee in 2020. This dataset applies the output of the Diet Trase model to the consolidated trade data.


Details

Column Type Description
mass_tonnes_raw_equivalent DOUBLE
year BIGINT
padded_type VARCHAR
port_of_export_name VARCHAR
country_of_production_name VARCHAR
branch VARCHAR
status VARCHAR
linear_programming_failure_reason VARCHAR
is_domestic BOOLEAN
domestic_consumption_region_geocode VARCHAR
production_geocode VARCHAR
domestic_consumption_region_name VARCHAR
domestic_consumption_region_level FLOAT
production_geocode_name VARCHAR
production_geocode_level FLOAT
country_of_destination VARCHAR
economic_bloc VARCHAR
exporter_group_name VARCHAR
exporter_group_parent VARCHAR
exporter_label VARCHAR
exporter_name VARCHAR
exporter_node_id INTEGER
fob DOUBLE
hs6 VARCHAR
importer_group VARCHAR
importer_label VARCHAR
importer_name VARCHAR
mass_tonnes DOUBLE
padded BOOLEAN
port_of_export_label VARCHAR
proportion DOUBLE

Models / Seeds

  • model.trase_duckdb.diet_trase_subnational_regions
  • model.trase_duckdb.diet_trase_coffee_trade_padded_2020
  • model.trase_duckdb.diet_trase_coffee_2020_brazil
  • model.trase_duckdb.diet_trase_coffee_2020_tanzania
  • model.trase_duckdb.diet_trase_coffee_2020_other

No called script or script source not found.

import pandas as pd
from pandas.testing import assert_frame_equal

from trase.models.diet_trase.coffee_fullmodel.constants import UNKNOWN_REGION
from trase.tools import sps
from trase.tools.sei_pcs.pandas_utilities import split_dataframe_using_proportions


def enrichment_join(left, right, on):
    df = pd.merge(
        left, right, validate="many_to_one", on=on, how="left", indicator=True
    )
    missing = df[df["_merge"] != "both"]
    if not missing.empty:
        missing_values = missing[on].drop_duplicates()
        print(
            f"Warning: Missing values in enrichment join for column '{on}': {missing_values}"
        )
    return df.drop(columns=["_merge"])


def assert_equal_after_consolidation(df1, df2, numerical_columns, categorical_columns):
    c = sps.consolidate(df1, numerical_columns, categorical_columns)
    d = sps.consolidate(df2, numerical_columns, categorical_columns)
    assert_frame_equal(c, d, check_like=True)


def model(dbt, cursor):
    dbt.config(materialized="external")

    # -------------------------------------------------------------------------------- #
    # load subnational regions
    # -------------------------------------------------------------------------------- #
    df_regions = dbt.ref("diet_trase_subnational_regions").df()
    df_regions = df_regions.rename(
        columns={
            "country": "country_of_production_name",
        },
        errors="raise",
    )
    df_regions["country_of_production_name"] = df_regions[
        "country_of_production_name"
    ].str.upper()

    # -------------------------------------------------------------------------------- #
    # load the input trade data
    # -------------------------------------------------------------------------------- #
    df_trade = dbt.ref("diet_trase_coffee_trade_padded_2020").df()
    df_trade = df_trade.rename(
        columns={"producing_country": "country_of_production_name"},
        errors="raise",
    )

    # the model drops ST. VINCENT AND THE GRENADINES
    df_trade = df_trade[
        df_trade["country_of_production_name"] != "ST. VINCENT AND THE GRENADINES"
    ]

    # -------------------------------------------------------------------------------- #
    # load the model results
    # -------------------------------------------------------------------------------- #
    join_columns = [
        "year",
        "padded_type",
        "port_of_export_name",
        "country_of_production_name",
    ]
    columns_added_by_model = [
        "branch",
        "status",
        "linear_programming_failure_reason",
        "is_domestic",
        "domestic_consumption_region_geocode",
        "production_geocode",
    ]
    df: pd.DataFrame = sps.concat(
        [
            dbt.ref("diet_trase_coffee_2020_brazil").df(),
            dbt.ref("diet_trase_coffee_2020_tanzania").df(),
            dbt.ref("diet_trase_coffee_2020_other").df(),
        ]
    )
    df = df[["mass_tonnes_raw_equivalent", *join_columns, *columns_added_by_model]]
    df["padded_type"] = df["padded_type"].str.lower()  # TODO: fix upstream

    # add back in missing production geocode
    # TODO fix in model
    missing = df["production_geocode"] == ""
    df.loc[missing, "production_geocode"] = UNKNOWN_REGION

    # add names to geocodes added by the model
    unknowns = (
        df[["country_of_production_name"]]
        .drop_duplicates()
        .assign(geocode="XX", gadm_level=None, name="UNKNOWN")
    )
    df_regions_with_unknown = sps.concat([df_regions, unknowns], ignore_index=True)

    df = enrichment_join(
        df,
        df_regions_with_unknown.rename(
            columns={
                "geocode": "domestic_consumption_region_geocode",
                "name": "domestic_consumption_region_name",
                "gadm_level": "domestic_consumption_region_level",
            },
            errors="raise",
        ),
        on=["domestic_consumption_region_geocode", "country_of_production_name"],
    )
    df = enrichment_join(
        df,
        df_regions_with_unknown.rename(
            columns={
                "geocode": "production_geocode",
                "name": "production_geocode_name",
                "gadm_level": "production_geocode_level",
            },
            errors="raise",
        ),
        on=["production_geocode", "country_of_production_name"],
    )

    # split out domestic from trade
    is_domestic = df["is_domestic"]
    assert set(is_domestic.unique()) == {True, False}
    df_domestic_results = df[is_domestic].copy()
    df_trade_results = df[~is_domestic].copy()

    # compute proportions
    df_trade_results["proportion"] = sps.grouped_proportion(
        df_trade_results,
        "mass_tonnes_raw_equivalent",
        join_columns,
    )

    # -------------------------------------------------------------------------------- #
    # ensure that that trade results have not been altered
    # -------------------------------------------------------------------------------- #
    assert_equal_after_consolidation(
        df_trade_results, df_trade, ["mass_tonnes_raw_equivalent"], join_columns
    )

    # -------------------------------------------------------------------------------- #
    # apply results to trade data
    # -------------------------------------------------------------------------------- #

    # split the trade data
    df_trade_splitted = split_dataframe_using_proportions(
        df_trade,
        df_trade_results.drop(columns=["mass_tonnes_raw_equivalent"], errors="raise"),
        values=["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"],
        on=join_columns,
        by="proportion",
        where=None,
        validate=None,
    )

    # validate that trade data has been preserved
    numerical_columns = ["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"]
    categorical_columns = [
        col for col in df_trade.columns if col not in numerical_columns
    ]
    categorical_columns.remove("exporter_group_parent")  # has NaNs
    assert_equal_after_consolidation(
        df_trade, df_trade_splitted, numerical_columns, categorical_columns
    )

    # append the domestic data
    df_domestic = df_domestic_results.assign(
        country_of_destination=df_domestic_results["country_of_production_name"],
        economic_bloc=df_domestic_results["country_of_production_name"],
        exporter_group_name=None,
        exporter_group_parent=None,
        exporter_label=None,
        exporter_name=None,
        exporter_node_id=None,
        fob=None,  # FOB is only applicable for trade
        hs6=None,
        importer_group=None,
        importer_label=None,
        importer_name=None,
        mass_tonnes=None,  # HS6 is unknown means we don't know equivalence factors
        padded=None,
        port_of_export_label=None,
        proportion=1.0,
    )
    df_final = sps.concat([df_domestic, df_trade_splitted])

    return df_final