DBT: Diet Trase Coffee 2020
File location: s3://trase-storage/diet-trase/diet-trase-results-2020.parquet
DBT model name: diet_trase_coffee_2020
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main.diet_trase_coffee_2020 -
Containing yaml link: trase/data_pipeline/models/diet_trase/_schema.yml
-
Model file: trase/data_pipeline/models/diet_trase/diet_trase_coffee_2020.py
-
Tags:
gold,coffee,diet-trase-coffee
Description
Results of the supply chain model for Diet Trase coffee in 2020. This dataset applies the output of the Diet Trase model to the consolidated trade data.
Details
| Column | Type | Description |
|---|---|---|
year |
BIGINT |
|
country_of_production |
VARCHAR |
|
country_of_production_iso2 |
VARCHAR |
|
port_of_export_name |
VARCHAR |
|
hs6 |
VARCHAR |
|
exporter_name |
VARCHAR |
|
exporter_node_id |
BIGINT |
|
exporter_group |
VARCHAR |
|
exporter_group_parent |
VARCHAR |
|
importer_name |
VARCHAR |
|
importer_group |
VARCHAR |
|
country_of_first_import |
VARCHAR |
|
country_of_first_import_iso2 |
VARCHAR |
|
country_of_first_import_economic_bloc |
VARCHAR |
|
is_padded |
BOOLEAN |
|
padded_type |
VARCHAR |
|
decision_tree_branch |
VARCHAR |
|
linear_programming_status |
VARCHAR |
|
linear_programming_failure_reason |
VARCHAR |
|
is_domestic |
BOOLEAN |
|
domestic_consumption_region_geocode_id |
VARCHAR |
|
domestic_consumption_region_name |
VARCHAR |
|
domestic_consumption_region_level |
INTEGER |
|
production_geocode_id |
VARCHAR |
|
production_geocode_name |
VARCHAR |
|
production_geocode_level |
INTEGER |
|
port_of_export_label |
VARCHAR |
|
exporter_label |
VARCHAR |
|
importer_label |
VARCHAR |
|
mass_tonnes |
DOUBLE |
|
mass_tonnes_raw_equivalent |
DOUBLE |
|
fob |
DOUBLE |
|
port_country_proportion |
DOUBLE |
Models / Seeds
model.trase_duckdb.postgres_countriesmodel.trase_duckdb.diet_trase_subnational_regionsmodel.trase_duckdb.diet_trase_coffee_trade_padded_2020model.trase_duckdb.diet_trase_coffee_2020_brazilmodel.trase_duckdb.diet_trase_coffee_2020_tanzaniamodel.trase_duckdb.diet_trase_coffee_2020_other
No called script or script source not found.
import pandas as pd
import polars as pl
from pandas.testing import assert_frame_equal
from trase.models.diet_trase.coffee_fullmodel.constants import UNKNOWN_REGION
from trase.tools import sps
from trase.tools.sei_pcs.pandas_utilities import split_dataframe_using_proportions
def enrichment_join(left, right, on):
df = pd.merge(
left, right, validate="many_to_one", on=on, how="left", indicator=True
)
missing = df[df["_merge"] != "both"]
if not missing.empty:
missing_values = missing[on].drop_duplicates()
print(
f"Warning: Missing values in enrichment join for column '{on}': {missing_values}"
)
return df.drop(columns=["_merge"])
def assert_equal_after_consolidation(df1, df2, numerical_columns, categorical_columns):
c = sps.consolidate(df1, numerical_columns, categorical_columns)
d = sps.consolidate(df2, numerical_columns, categorical_columns)
assert_frame_equal(c, d, check_like=True)
def select_and_reorder_columns(df):
columns = [
"year",
"country_of_production_name",
"country_of_production_iso2",
"port_of_export_name",
"hs6",
"exporter_name",
"exporter_node_id",
"exporter_group_name",
"exporter_group_parent",
"importer_name",
"importer_group",
"country_of_destination",
"country_of_destination_iso2",
"country_of_destination_economic_bloc",
"mass_tonnes",
"mass_tonnes_raw_equivalent",
"fob",
"is_padded",
"padded_type",
"branch",
"status",
"linear_programming_failure_reason",
"is_domestic",
"domestic_consumption_region_geocode",
"domestic_consumption_region_name",
"domestic_consumption_region_level",
"production_geocode",
"production_geocode_name",
"production_geocode_level",
"proportion",
"port_of_export_label",
"exporter_label",
"importer_label",
]
df = df[columns]
df = df.rename(
columns={
"exporter_group_name": "exporter_group",
"country_of_production_name": "country_of_production",
"country_of_destination": "country_of_first_import",
"country_of_destination_iso2": "country_of_first_import_iso2",
"country_of_destination_economic_bloc": "country_of_first_import_economic_bloc",
"domestic_consumption_region_geocode": "domestic_consumption_region_geocode_id",
"production_geocode": "production_geocode_id",
"branch": "decision_tree_branch",
"proportion": "port_country_proportion",
"status": "linear_programming_status",
},
errors="raise",
)
# Take updated column names into account
columns = df.columns.tolist()
# Aggregate by mass_tonnes, mass_tonnes_raw_equivalent, fob, proportion
df = df.groupby(
[
col
for col in columns
if col
not in [
"mass_tonnes",
"mass_tonnes_raw_equivalent",
"fob",
"port_country_proportion",
]
],
dropna=False,
as_index=False,
)[
["mass_tonnes", "mass_tonnes_raw_equivalent", "fob", "port_country_proportion"]
].sum(
min_count=1
)
polars_df = pl.from_pandas(df)
# Fix some data types that are ints instead of floats
# do it in polars so ints with nulls don't get converted to floats
polars_df = polars_df.with_columns(
[
pl.col("exporter_node_id").cast(pl.Int64, strict=True),
pl.col("production_geocode_level").cast(pl.Int32, strict=True),
pl.col("domestic_consumption_region_level").cast(pl.Int32, strict=True),
]
)
return polars_df
def model(dbt, cursor):
dbt.config(
materialized="external",
depends_on=["postgres_countries"],
)
# -------------------------------------------------------------------------------- #
# load country information
# -------------------------------------------------------------------------------- #
r = dbt.ref("postgres_countries")
df_countries = r.df()[["country_name", "synonyms", "country_trase_id"]].reset_index(
drop=True
)
# -------------------------------------------------------------------------------- #
# load subnational regions
# -------------------------------------------------------------------------------- #
df_regions = dbt.ref("diet_trase_subnational_regions").df()
df_regions = df_regions.rename(
columns={
"country": "country_of_production_name",
},
errors="raise",
)
df_regions["country_of_production_name"] = df_regions[
"country_of_production_name"
].str.upper()
# -------------------------------------------------------------------------------- #
# load the input trade data and rename some columns
# -------------------------------------------------------------------------------- #
df_trade = dbt.ref("diet_trase_coffee_trade_padded_2020").df()
df_trade = df_trade.rename(
columns={
"producing_country": "country_of_production_name",
"economic_bloc": "country_of_destination_economic_bloc",
"padded": "is_padded",
},
errors="raise",
)
# the model drops ST. VINCENT AND THE GRENADINES
df_trade = df_trade[
df_trade["country_of_production_name"] != "ST. VINCENT AND THE GRENADINES"
]
# -------------------------------------------------------------------------------- #
# load the model results
# -------------------------------------------------------------------------------- #
join_columns = [
"year",
"padded_type",
"port_of_export_name",
"country_of_production_name",
]
columns_added_by_model = [
"branch",
"status",
"linear_programming_failure_reason",
"is_domestic",
"domestic_consumption_region_geocode",
"production_geocode",
]
df: pd.DataFrame = sps.concat(
[
dbt.ref("diet_trase_coffee_2020_brazil").df(),
dbt.ref("diet_trase_coffee_2020_tanzania").df(),
dbt.ref("diet_trase_coffee_2020_other").df(),
]
)
df = df[["mass_tonnes_raw_equivalent", *join_columns, *columns_added_by_model]]
df["padded_type"] = df["padded_type"].str.lower() # TODO: fix upstream
# add back in missing production geocode
# TODO fix in model
missing = df["production_geocode"] == ""
df.loc[missing, "production_geocode"] = UNKNOWN_REGION
# add names to geocodes added by the model
unknowns = (
df[["country_of_production_name"]]
.drop_duplicates()
.assign(geocode="XX", gadm_level=None, name="UNKNOWN")
)
df_regions_with_unknown = sps.concat([df_regions, unknowns], ignore_index=True)
df = enrichment_join(
df,
df_regions_with_unknown.rename(
columns={
"geocode": "domestic_consumption_region_geocode",
"name": "domestic_consumption_region_name",
"gadm_level": "domestic_consumption_region_level",
},
errors="raise",
),
on=["domestic_consumption_region_geocode", "country_of_production_name"],
)
df = enrichment_join(
df,
df_regions_with_unknown.rename(
columns={
"geocode": "production_geocode",
"name": "production_geocode_name",
"gadm_level": "production_geocode_level",
},
errors="raise",
),
on=["production_geocode", "country_of_production_name"],
)
# split out domestic from trade
is_domestic = df["is_domestic"]
assert set(is_domestic.unique()) == {True, False}
df_domestic_results = df[is_domestic].copy()
df_trade_results = df[~is_domestic].copy()
# compute proportions
df_trade_results["proportion"] = sps.grouped_proportion(
df_trade_results,
"mass_tonnes_raw_equivalent",
join_columns,
)
# -------------------------------------------------------------------------------- #
# ensure that that trade results have not been altered
# -------------------------------------------------------------------------------- #
# make sure years are int32 for comparison
df_trade["year"] = df_trade["year"].astype("int32")
df_trade_results["year"] = df_trade_results["year"].astype("int32")
assert_equal_after_consolidation(
df_trade_results, df_trade, ["mass_tonnes_raw_equivalent"], join_columns
)
# -------------------------------------------------------------------------------- #
# apply results to trade data
# -------------------------------------------------------------------------------- #
# split the trade data
df_trade_splitted = split_dataframe_using_proportions(
df_trade,
df_trade_results.drop(columns=["mass_tonnes_raw_equivalent"], errors="raise"),
values=["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"],
on=join_columns,
by="proportion",
where=None,
validate=None,
)
# validate that trade data has been preserved
numerical_columns = ["mass_tonnes_raw_equivalent", "mass_tonnes", "fob"]
categorical_columns = [
col for col in df_trade.columns if col not in numerical_columns
]
categorical_columns.remove("exporter_group_parent") # has NaNs
assert_equal_after_consolidation(
df_trade, df_trade_splitted, numerical_columns, categorical_columns
)
# append the domestic data
df_domestic = df_domestic_results.assign(
country_of_destination=df_domestic_results["country_of_production_name"],
country_of_destination_economic_bloc=df_domestic_results[
"country_of_production_name"
],
exporter_group_name="UNKNOWN",
exporter_group_parent=None,
exporter_label="UNKNOWN",
exporter_name="UNKNOWN",
exporter_node_id=15221616,
fob=None, # FOB is only applicable for trade
hs6=None,
importer_group="UNKNOWN",
importer_label="UNKNOWN",
importer_name="UNKNOWN",
mass_tonnes=None, # HS6 is unknown means we don't know equivalence factors
is_padded=None,
padded_type=None,
port_of_export_label="UNKNOWN",
proportion=1.0,
)
df_final = sps.concat([df_domestic, df_trade_splitted])
# Final clean of the country names, adding their iso2 codes
df_countries_exploded = df_countries.explode("synonyms")
df_final = enrichment_join(
df_final,
df_countries_exploded.rename(
columns={
"synonyms": "country_of_production_name",
"country_name": "clean_country_of_production_name",
"country_trase_id": "country_of_production_iso2",
},
errors="raise",
),
on=["country_of_production_name"],
)
df_final = enrichment_join(
df_final,
df_countries_exploded.rename(
columns={
"synonyms": "country_of_destination",
"country_name": "clean_country_of_destination",
"country_trase_id": "country_of_destination_iso2",
},
errors="raise",
),
on=["country_of_destination"],
)
df_final["country_of_production_name"] = df_final[
"clean_country_of_production_name"
]
df_final["country_of_destination"] = df_final["clean_country_of_destination"]
df_final = df_final.drop(
columns=[
"clean_country_of_production_name",
"clean_country_of_destination",
],
errors="raise",
)
# select and reorder columns, and use polars to allow int types to have nulls
polars_df_final = select_and_reorder_columns(df_final)
return polars_df_final