Diet Trase Coffee 2020
s3://trase-storage/diet-trase/diet-trase-results-2020.parquet
Dbt path: trase_production.main.diet_trase_coffee_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/diet_trase/_schema.yml
Model file link: trase/data_pipeline/models/diet_trase/diet_trase_coffee_2020.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: gold, coffee, diet-trase-coffee
diet_trase_coffee_2020
Description
Results of the supply chain model for Diet Trase coffee in 2020. This dataset applies the output of the Diet Trase model to the consolidated trade data.
Details
| Column | Type | Description |
|---|---|---|
mass_tonnes_raw_equivalent |
DOUBLE |
|
year |
BIGINT |
|
padded_type |
VARCHAR |
|
port_of_export_name |
VARCHAR |
|
country_of_production_name |
VARCHAR |
|
branch |
VARCHAR |
|
status |
VARCHAR |
|
linear_programming_failure_reason |
VARCHAR |
|
is_domestic |
BOOLEAN |
|
domestic_consumption_region_geocode |
VARCHAR |
|
production_geocode |
VARCHAR |
|
domestic_consumption_region_name |
VARCHAR |
|
domestic_consumption_region_level |
FLOAT |
|
production_geocode_name |
VARCHAR |
|
production_geocode_level |
FLOAT |
|
country_of_destination |
VARCHAR |
|
economic_bloc |
VARCHAR |
|
exporter_group_name |
VARCHAR |
|
exporter_group_parent |
VARCHAR |
|
exporter_label |
VARCHAR |
|
exporter_name |
VARCHAR |
|
exporter_node_id |
INTEGER |
|
fob |
DOUBLE |
|
hs6 |
VARCHAR |
|
importer_group |
VARCHAR |
|
importer_label |
VARCHAR |
|
importer_name |
VARCHAR |
|
mass_tonnes |
DOUBLE |
|
padded |
BOOLEAN |
|
port_of_export_label |
VARCHAR |
|
proportion |
DOUBLE |
Models / Seeds
model.trase_duckdb.diet_trase_subnational_regionsmodel.trase_duckdb.diet_trase_coffee_trade_padded_2020model.trase_duckdb.diet_trase_coffee_2020_brazilmodel.trase_duckdb.diet_trase_coffee_2020_tanzaniamodel.trase_duckdb.diet_trase_coffee_2020_other
No called script or script source not found.
import pandas as pd
from pandas.testing import assert_frame_equal
from trase.models.diet_trase.coffee_fullmodel.constants import UNKNOWN_REGION
from trase.tools import sps
from trase.tools.sei_pcs.pandas_utilities import split_dataframe_using_proportions
def enrichment_join(left, right, on):
df = pd.merge(
left, right, validate="many_to_one", on=on, how="left", indicator=True
)
missing = df[df["_merge"] != "both"]
if not missing.empty:
missing_values = missing[on].drop_duplicates()
print(
f"Warning: Missing values in enrichment join for column '{on}': {missing_values}"
)
return df.drop(columns=["_merge"])
def assert_equal_after_consolidation(df1, df2, numerical_columns, categorical_columns):
c = sps.consolidate(df1, numerical_columns, categorical_columns)
d = sps.consolidate(df2, numerical_columns, categorical_columns)
assert_frame_equal(c, d, check_like=True)
def model(dbt, cursor):
dbt.config(materialized="external")
# -------------------------------------------------------------------------------- #
# load subnational regions
# -------------------------------------------------------------------------------- #
df_regions = dbt.ref("diet_trase_subnational_regions").df()
df_regions = df_regions.rename(
columns={
"country": "country_of_production_name",
},
errors="raise",
)
df_regions["country_of_production_name"] = df_regions[
"country_of_production_name"
].str.upper()
# -------------------------------------------------------------------------------- #
# load the input trade data
# -------------------------------------------------------------------------------- #
df_trade = dbt.ref("diet_trase_coffee_trade_padded_2020").df()
df_trade = df_trade.rename(
columns={"producing_country": "country_of_production_name"},
errors="raise",
)
# the model drops ST. VINCENT AND THE GRENADINES
df_trade = df_trade[
df_trade["country_of_production_name"] != "ST. VINCENT AND THE GRENADINES"
]
# -------------------------------------------------------------------------------- #
# load the model results
# -------------------------------------------------------------------------------- #
join_columns = [
"year",
"padded_type",
"port_of_export_name",
"country_of_production_name",
]
columns_added_by_model = [
"branch",
"status",
"linear_programming_failure_reason",
"is_domestic",
"domestic_consumption_region_geocode",
"production_geocode",
]
df: pd.DataFrame = sps.concat(
[
dbt.ref("diet_trase_coffee_2020_brazil").df(),
dbt.ref("diet_trase_coffee_2020_tanzania").df(),
dbt.ref("diet_trase_coffee_2020_other").df(),
]
)
df = df[["mass_tonnes_raw_equivalent", *join_columns, *columns_added_by_model]]
df["padded_type"] = df["padded_type"].str.lower() # TODO: fix upstream
# add back in missing production geocode
# TODO fix in model
missing = df["production_geocode"] == ""
df.loc[missing, "production_geocode"] = UNKNOWN_REGION
# add names to geocodes added by the model
unknowns = (
df[["country_of_production_name"]]
.drop_duplicates()
.assign(geocode="XX", gadm_level=None, name="UNKNOWN")
)
df_regions_with_unknown = sps.concat([df_regions, unknowns], ignore_index=True)
df = enrichment_join(
df,
df_regions_with_unknown.rename(
columns={
"geocode": "domestic_consumption_region_geocode",
"name": "domestic_consumption_region_name",
"gadm_level": "domestic_consumption_region_level",
},
errors="raise",
),
on=["domestic_consumption_region_geocode", "country_of_production_name"],
)
df = enrichment_join(
df,
df_regions_with_unknown.rename(
columns={
"geocode": "production_geocode",
"name": "production_geocode_name",
"gadm_level": "production_geocode_level",
},
errors="raise",
),
on=["production_geocode", "country_of_production_name"],
)
# split out domestic from trade
is_domestic = df["is_domestic"]
assert set(is_domestic.unique()) == {True, False}
df_domestic_results = df[is_domestic].copy()
df_trade_results = df[~is_domestic].copy()
# compute proportions
df_trade_results["proportion"] = sps.grouped_proportion(
df_trade_results,
"mass_tonnes_raw_equivalent",
join_columns,
)
# -------------------------------------------------------------------------------- #
# ensure that that trade results have not been altered
# -------------------------------------------------------------------------------- #
assert_equal_after_consolidation(
df_trade_results, df_trade, ["mass_tonnes_raw_equivalent"], join_columns
)
# -------------------------------------------------------------------------------- #
# apply results to trade data
# -------------------------------------------------------------------------------- #
# split the trade data
df_trade_splitted = split_dataframe_using_proportions(
df_trade,
df_trade_results.drop(columns=["mass_tonnes_raw_equivalent"], errors="raise"),
values=["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"],
on=join_columns,
by="proportion",
where=None,
validate=None,
)
# validate that trade data has been preserved
numerical_columns = ["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"]
categorical_columns = [
col for col in df_trade.columns if col not in numerical_columns
]
categorical_columns.remove("exporter_group_parent") # has NaNs
assert_equal_after_consolidation(
df_trade, df_trade_splitted, numerical_columns, categorical_columns
)
# append the domestic data
df_domestic = df_domestic_results.assign(
country_of_destination=df_domestic_results["country_of_production_name"],
economic_bloc=df_domestic_results["country_of_production_name"],
exporter_group_name=None,
exporter_group_parent=None,
exporter_label=None,
exporter_name=None,
exporter_node_id=None,
fob=None, # FOB is only applicable for trade
hs6=None,
importer_group=None,
importer_label=None,
importer_name=None,
mass_tonnes=None, # HS6 is unknown means we don't know equivalence factors
padded=None,
port_of_export_label=None,
proportion=1.0,
)
df_final = sps.concat([df_domestic, df_trade_splitted])
return df_final