Diet Trase Coffee Trade Consolidated 2020
s3://trase-storage/diet-trase/diet_trase_coffee_trade_consolidated_2020.parquet
Dbt path: trase_production.main.diet_trase_coffee_trade_consolidated_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/diet_trase/_schema.yml
Model file link: trase/data_pipeline/models/diet_trase/diet_trase_coffee_trade_consolidated_2020.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: coffee, diet-trase-coffee
diet_trase_coffee_trade_consolidated_2020
Description
Brings together the coffee trade data of the countries we have data from. Runs cleaning, including:
* Add an economic_bloc field, where if the destination country is part of the EU, it adds the "EUROPEAN UNION" value, and if it isn't, it uses the normal destination coutnry name.
* Including standard names of exporters, importers, ports
Countries included: * Brazil * Colombia * Cote D'Ivoire * Ethiopia * India * Indonesia * Peru * Tanzania * Uganda * Vietnam
Details
| Column | Type | Description |
|---|---|---|
year |
INTEGER |
|
producing_country |
VARCHAR |
|
hs6 |
VARCHAR |
|
mass_tonnes |
DOUBLE |
|
fob |
DOUBLE |
|
exporter_label |
VARCHAR |
|
exporter_name |
VARCHAR |
|
exporter_node_id |
BIGINT |
|
exporter_group_name |
VARCHAR |
|
exporter_group_parent |
VARCHAR |
|
port_of_export_label |
VARCHAR |
|
port_of_export_name |
VARCHAR |
|
country_of_destination |
VARCHAR |
|
economic_bloc |
VARCHAR |
|
importer_label |
VARCHAR |
|
importer_name |
VARCHAR |
|
importer_group |
VARCHAR |
Models / Seeds
model.trase_duckdb.postgres_countriesmodel.trase_duckdb.postgres_tradersmodel.trase_duckdb.postgres_portsmodel.trase_duckdb.diet_trase_coffee_trader_parentsmodel.trase_duckdb.gold_brazil_bol_coffee_2020model.trase_duckdb.gold_colombia_cd_coffee_2020model.trase_duckdb.gold_cote_divoire_coffee_2020model.trase_duckdb.gold_ethiopia_coffee_2020model.trase_duckdb.gold_india_trade_coffee_2020model.trase_duckdb.gold_indonesia_bol_coffee_2020model.trase_duckdb.gold_peru_cd_coffee_2020model.trase_duckdb.gold_tanzania_trade_coffee_2020model.trase_duckdb.gold_uganda_coffee_2020model.trase_duckdb.gold_vietnam_trade_coffee_2020
No called script or script source not found.
"""
Diet Trase consolidated coffee trade
This script brings together the coffee trade data from several countries, as well as
adds `economic_bloc` field.
Countries included:
* Brazil
* Colombia
* Cote D'Ivoire
* Ethiopia
* India
* Indonesia
* Peru
* Tanzania
* Uganda
* Vietnam
"""
import polars as pl
YEAR = 2020
COUNTRY_NAMES = [
"BRAZIL",
"COLOMBIA",
"COTE D'IVOIRE",
"ETHIOPIA",
"INDIA",
"INDONESIA",
"PERU",
"TANZANIA",
"UGANDA",
"VIETNAM",
]
TRADE_COLS_TO_SELECT = [
"year",
"exporter_label",
"exporter_node_id",
"country_of_destination",
"mass_tonnes",
"fob",
"importer_label",
"port_of_export_label",
"hs6",
]
def select_relevant_columns(lf: pl.LazyFrame) -> pl.LazyFrame:
return lf.select(TRADE_COLS_TO_SELECT)
def clean_strings(trade_lf, columns):
# Trim, normalize, uppercase, remove extra spaces
trade_lf = trade_lf.with_columns(
[
pl.col(col).str.strip_chars()
# Replace multiple spaces with a single space
.str.replace(r"\s+", " ")
# Convert accents and diacritics (see https://stackoverflow.com/a/77217563)
.str.normalize("NFKD")
.str.replace_all(r"\p{CombiningMark}", "")
.str.to_uppercase()
.alias(col)
for col in columns
]
)
return trade_lf
def identify_active_group_name(df, year, group_column):
"""
Takes a dataframe with field containing the groups information as available
in the trader reference dataset (a list of json strings), and adds a new
'group_name' field with the valid group name for the given year.
"""
# Add a row index to be able to group by it later
df = df.with_row_index("row_index")
df_exploded = df.explode(group_column)
# Casting as polars might identify the field as binary
df_exploded = df_exploded.with_columns(
pl.col(group_column).cast(pl.Utf8).alias(group_column)
)
# Turn the json string into a struct with the appropriate types
dtype = pl.Struct(
[
pl.Field("group", pl.Utf8),
pl.Field("time_start", pl.Datetime),
pl.Field("time_end", pl.Datetime),
]
)
df_exploded = df_exploded.with_columns(
group_struct=pl.col(group_column).str.json_decode(dtype)
)
# Flatten the struct into new fields
df_exploded = df_exploded.with_columns(
group_name=pl.col("group_struct").struct.field("group"),
time_start=pl.col("group_struct").struct.field("time_start"),
time_end=pl.col("group_struct").struct.field("time_end"),
)
# Only keep groups that are valid for the year
df_exploded = df_exploded.filter(
(pl.col("time_end").is_null()) | (pl.col("time_end").dt.year() > pl.lit(year))
)
# Group again by the index, and only keep a group name
df_unexploded = df_exploded.group_by("row_index").agg(
pl.max("group_name").alias("group_name")
)
result = df.join(df_unexploded, on="row_index", how="left").drop("row_index")
return result
def add_european_union_bloc(trade_lf, countries_lf):
"""
Adds a 'economic_bloc_name' column with "EUROPEAN UNION" for countries currently making part of it.
For those who are not part, then it replicates there the 'country_of_destination' value
"""
economic_blocs_lf = countries_lf.select("country_name", "economic_bloc")
economic_blocs_lf = economic_blocs_lf.explode("economic_bloc")
# Turn the json string into a struct with the appropriate types
dtype = pl.Struct(
[
pl.Field("economic_bloc", pl.Utf8),
pl.Field("time_start", pl.Datetime),
pl.Field("time_end", pl.Datetime),
]
)
economic_blocs_lf = economic_blocs_lf.with_columns(
economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
)
# Flatten the struct into new fields
economic_blocs_lf = economic_blocs_lf.with_columns(
economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
)
# Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
economic_blocs_lf = (
economic_blocs_lf.filter(
pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
)
.filter(pl.col("time_start").dt.year() <= pl.lit(YEAR))
.filter(
(pl.col("time_end").is_null())
| (pl.col("time_end").dt.year() > pl.lit(YEAR))
)
)
economic_blocs_lf = economic_blocs_lf.select("country_name", "economic_bloc_name")
economic_blocs_lf = economic_blocs_lf.rename(
{"economic_bloc_name": "economic_bloc"}
)
trade_lf = trade_lf.join(
economic_blocs_lf,
how="left",
left_on="country_of_destination",
right_on="country_name",
validate="m:1",
)
# When the country is not part of EUROPEAN UNION, just use the 'country_of_destination'
trade_lf = trade_lf.with_columns(
economic_bloc=pl.when(~pl.col("economic_bloc").is_not_null())
.then(pl.col("country_of_destination"))
.otherwise(pl.col("economic_bloc"))
)
return trade_lf
def add_trader_data_by_label(
input_trade_data_lf, db_traders_lf, trader_label_colummn, country_iso2=None
):
"""
Adds trader information to the input trade data based on trader labels.
This function takes a DataFrame containing trade data and adds trader information
from a reference DataFrame. It matches trader labels in the input data with labels
in the reference data and returns the best match including trader name, trader_trase_id,
and trader group. The function also prioritizes traders based on the provided
country ISO2 code and filters for active groups based on the given year.
Parameters:
- input_trade_data_lf (DataFrame): The input trade data containing trader labels.
- db_traders_lf (DataFrame): The reference DataFrame containing trader information.
- trader_label_colummn (str): The column name in the input trade data that contains trader labels.
- country_iso2 (str, optional): The ISO2 code of the country to prioritize traders from. Defaults to None.
Returns:
- DataFrame: The input trade data with added trader information.
"""
# Take the reference trader information, and unnest the labels
db_traders_lf = db_traders_lf.select(
"labels", "name", "trader_node_id", "trase_id", "group_name"
)
db_traders_lf = db_traders_lf.explode("labels")
# Temporarily rename the trader_label_column to 'trader_label', and only work with it
input_trade_data_lf = input_trade_data_lf.rename(
{trader_label_colummn: "trader_label"}
)
input_labels_lf = input_trade_data_lf.select("trader_label").unique()
joined_with_label_data = input_labels_lf.join(
db_traders_lf, left_on="trader_label", right_on="labels", how="left"
)
# Add a country_priority field to prioritize the country's traders
joined_with_label_data = joined_with_label_data.with_columns(
country_priority=pl.when(
(pl.lit(country_iso2).is_not_null())
& (pl.col("trase_id").str.slice(0, 2) == pl.lit(country_iso2))
)
.then(pl.lit(1))
.otherwise(pl.lit(0))
)
# Aggregate, giving priority to the country's traders, and only taking one value
grouped_label_data = (
joined_with_label_data.group_by("trader_label").agg(
[pl.all().sort_by("country_priority", descending=True).head(1)]
)
).drop("country_priority")
# Disaggregate as we're only taking one matching record, and improve names
grouped_label_data = grouped_label_data.explode(
["name", "trader_node_id", "trase_id", "group_name"]
)
grouped_label_data = grouped_label_data.rename(
{
"name": "trader_name",
"trase_id": "trader_trase_id",
"group_name": "trader_group_name",
}
)
# Add the results to the original input_trade_data_lf
input_trade_data_lf = input_trade_data_lf.join(
grouped_label_data,
how="left",
on="trader_label",
)
# Return the original trader_label_column name
input_trade_data_lf = input_trade_data_lf.rename(
{"trader_label": trader_label_colummn}
)
return input_trade_data_lf
def add_exporter_name_and_ownership(trade_lf, reference_traders_lf, group_parents_lf):
"""
Takes the 'exporter_node_id', and adds the exporter_name, the exporter_group_name,
and the parent company of the company group (if any) as 'exporter_group_parent'
"""
# Add exporter_name and exporter_group_name from the reference traders dataset
exporter_names_lf = reference_traders_lf.select(
["trader_node_id", "name", "group_name"]
).unique()
exporter_names_lf = exporter_names_lf.rename(
{
"trader_node_id": "exporter_node_id",
"name": "exporter_name",
"group_name": "exporter_group_name",
}
)
trade_lf = trade_lf.join(
exporter_names_lf,
how="left",
on="exporter_node_id",
validate="m:1",
)
# Take traders without a match, and pass them through add_trader_data_by_label
missing_exporter_names_lf = trade_lf.filter(pl.col("exporter_name").is_null())
missing_exporter_names_lf = missing_exporter_names_lf.select(
["exporter_label"]
).unique()
missing_exporter_names_lf = add_trader_data_by_label(
missing_exporter_names_lf, reference_traders_lf, "exporter_label"
)
missing_exporter_names_lf = missing_exporter_names_lf.rename(
{
"trader_name": "missing_exporter_name",
"trader_node_id": "missing_exporter_node_id",
"trader_group_name": "missing_exporter_group_name",
}
)
# Add the results back to trade_lf
trade_lf = trade_lf.join(
missing_exporter_names_lf,
how="left",
on="exporter_label",
validate="m:1",
)
# Coalesce the missing fields into the main columns
trade_lf = trade_lf.with_columns(
exporter_name=pl.coalesce(
[pl.col("exporter_name"), pl.col("missing_exporter_name")]
),
exporter_node_id=pl.coalesce(
[pl.col("exporter_node_id"), pl.col("missing_exporter_node_id")]
),
exporter_group_name=pl.coalesce(
[pl.col("exporter_group_name"), pl.col("missing_exporter_group_name")]
),
).drop(
[
"missing_exporter_name",
"missing_exporter_node_id",
"missing_exporter_group_name",
]
)
# Add the parent company of exporter groups that have them
trade_lf = trade_lf.join(
group_parents_lf,
how="left",
left_on="exporter_group_name",
right_on="db_group_name",
validate="m:1",
)
return trade_lf
def clean_importers(trade_lf, reference_traders_lf):
"""
Takes the 'importer_label', and cleans it against the official trader database.
Adds the 'importer_name' and 'importer_group' fields
"""
# Replace NUL, 'A LA ORDEN', 'TO THE ORDER%' importer_label with 'UNKNOWN'
trade_lf = trade_lf.with_columns(
importer_label=pl.when(
(pl.col("importer_label").is_null())
| (pl.col("importer_label").str.to_uppercase().str.contains("A LA ORDEN"))
| (pl.col("importer_label").str.to_uppercase().str.contains("TO THE ORDER"))
| (pl.col("importer_label").str.to_uppercase().str.contains("NOT DECLARED"))
| (pl.col("importer_label").str.to_uppercase().str.contains("TO ORDER"))
)
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_label"))
)
# take the distinct importer labels, excluding 'UNKNOWN CUSTOMER' as it has multiple matches
importer_labels_lf = trade_lf.select("importer_label").unique()
importer_labels_lf = importer_labels_lf.filter(
pl.col("importer_label").ne("UNKNOWN CUSTOMER")
)
# Get the trader data for the importer labels, and merge it to trade_lf
importer_labels_lf = add_trader_data_by_label(
importer_labels_lf, reference_traders_lf, "importer_label"
)
importer_labels_lf = importer_labels_lf.rename(
{
"trader_name": "importer_name",
"trader_group_name": "importer_group",
}
)
trade_lf = trade_lf.join(
importer_labels_lf,
how="left",
on="importer_label",
)
# Add manually the name and group for the 'UNKNOWN CUSTOMER' importer
trade_lf = trade_lf.with_columns(
importer_name=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_name")),
importer_group=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_group")),
)
# Dropping 'trader_trase_id' as for importers it can be confusing to include a country identifier
trade_lf = trade_lf.drop("trader_trase_id")
# Replacing NULLs with 'UNKNOWN'
trade_lf = trade_lf.with_columns(
importer_name=pl.when(pl.col("importer_name").is_null())
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_name")),
importer_group=pl.when(pl.col("importer_group").is_null())
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_group")),
)
return trade_lf
def clean_ports(bol_lf, ports_lf):
"""
Creates the 'port_of_export_name' column.
Cleans the 'port_of_export_label'columns against the synonyms
in the ports reference table (currently a view from postgres views.regions created through
dbt model 'postgres_ports')
"""
ports_lf = ports_lf.select("name", "synonyms", "country").unique()
bol_lf = bol_lf.with_columns(
pl.col("port_of_export_label").alias("port_of_export_name")
)
for country in COUNTRY_NAMES:
country_ports_lf = ports_lf.filter(pl.col("country") == pl.lit(country))
# Manually remove database duplicates (based on synonym) for Indonesia,
# keeping the ones present in Mark's port files
if country == "INDONESIA":
country_ports_lf = country_ports_lf.filter(
~pl.col("name").is_in(
[
"TANJUNG EMAS",
"MEDAN",
"JAMBI",
"UJUNG PANDANG / HASANUDDIN (U)",
"BATU AMPAR",
]
)
)
country_ports_lf = country_ports_lf.rename(
{
"name": "matched_port_of_export",
"synonyms": "country_port_synonyms",
}
)
country_ports_lf = country_ports_lf.drop(["country"]).unique()
country_ports_lf = country_ports_lf.explode("country_port_synonyms").unique()
bol_lf = bol_lf.join(
country_ports_lf,
how="left",
left_on="port_of_export_label",
right_on="country_port_synonyms",
validate="m:1",
)
bol_lf = bol_lf.with_columns(
port_of_export_name=pl.when(pl.col("matched_port_of_export").is_not_null())
.then(pl.col("matched_port_of_export"))
.otherwise(pl.col("port_of_export_name"))
).drop("matched_port_of_export")
# Replace nulls in port_of_export_name with 'UNKNOWN'
bol_lf = bol_lf.with_columns(
port_of_export_name=pl.when(pl.col("port_of_export_name").is_null())
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("port_of_export_name")),
port_of_export_label=pl.when(pl.col("port_of_export_label").is_null())
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("port_of_export_label")),
)
return bol_lf
def model(dbt, cursor):
dbt.config(materialized="external")
# get country list including economic bloc, and official trader list
countries_lf = dbt.ref("postgres_countries").pl().lazy()
reference_traders_lf = dbt.ref("postgres_traders").pl().lazy()
ports_lf = dbt.ref("postgres_ports").pl().lazy()
# Add 'group_name' field to traders_lf with the active group
reference_traders_lf = identify_active_group_name(
reference_traders_lf, YEAR, "groups"
).drop("groups")
group_parents_lf = dbt.ref("diet_trase_coffee_trader_parents").pl().lazy()
group_parents_lf = (
group_parents_lf.filter(
(
(pl.col("group_parent_start_date").dt.year() <= pl.lit(YEAR))
| (pl.col("group_parent_start_date").is_null())
)
& (
(pl.col("group_parent_end_date").dt.year() > pl.lit(YEAR))
| (pl.col("group_parent_end_date").is_null())
)
)
.select(["db_group_name", "group_parent"])
.unique()
)
group_parents_lf = group_parents_lf.rename(
{"group_parent": "exporter_group_parent"}
)
# get trade data from each country
brazil_trade_lf = select_relevant_columns(
dbt.ref("gold_brazil_bol_coffee_2020").pl().lazy()
)
colombia_trade_lf = select_relevant_columns(
dbt.ref("gold_colombia_cd_coffee_2020").pl().lazy()
)
cote_divoire_trade_lf = select_relevant_columns(
dbt.ref("gold_cote_divoire_coffee_2020").pl().lazy()
)
ethiopia_trade_lf = select_relevant_columns(
dbt.ref("gold_ethiopia_coffee_2020").pl().lazy()
)
india_trade_lf = select_relevant_columns(
dbt.ref("gold_india_trade_coffee_2020").pl().lazy()
)
indonesia_trade_lf = select_relevant_columns(
dbt.ref("gold_indonesia_bol_coffee_2020").pl().lazy()
)
peru_trade_lf = select_relevant_columns(
dbt.ref("gold_peru_cd_coffee_2020").pl().lazy()
)
tanzania_trade_lf = select_relevant_columns(
dbt.ref("gold_tanzania_trade_coffee_2020").pl().lazy()
)
uganda_trade_lf = select_relevant_columns(
dbt.ref("gold_uganda_coffee_2020").pl().lazy()
)
vietnam_trade_lf = select_relevant_columns(
dbt.ref("gold_vietnam_trade_coffee_2020").pl().lazy()
)
# Add a 'producing_country' field in the dataframes
country_lazy_frames = [
brazil_trade_lf,
colombia_trade_lf,
cote_divoire_trade_lf,
ethiopia_trade_lf,
india_trade_lf,
indonesia_trade_lf,
peru_trade_lf,
tanzania_trade_lf,
uganda_trade_lf,
vietnam_trade_lf,
]
countries_dict = {
country_name: lazy_frame
for country_name, lazy_frame in zip(COUNTRY_NAMES, country_lazy_frames)
}
for country_name, lazy_frame in countries_dict.items():
lazy_frame = lazy_frame.with_columns(
pl.lit(country_name).alias("producing_country"),
pl.col("exporter_node_id").cast(pl.Int64).alias("exporter_node_id"),
)
countries_dict[country_name] = lazy_frame
trade_lf = pl.concat(countries_dict.values())
# Run additional pre-processing
cols_to_clean = ["exporter_label", "port_of_export_label", "importer_label"]
trade_lf = clean_strings(trade_lf, columns=cols_to_clean)
trade_lf = add_european_union_bloc(trade_lf, countries_lf)
trade_lf = add_exporter_name_and_ownership(
trade_lf, reference_traders_lf, group_parents_lf
)
trade_lf = clean_importers(trade_lf, reference_traders_lf)
trade_lf = clean_ports(trade_lf, ports_lf)
# Remove 'UNKNOWN COUNTRY' (a record from PERU)
trade_lf = trade_lf.filter(pl.col("country_of_destination") != "UNKNOWN COUNTRY")
# consolidate all, summing mass_tonnes and fob
trade_lf = trade_lf.group_by(
[
"year",
"producing_country",
"hs6",
"exporter_label",
"exporter_name",
"exporter_node_id",
"exporter_group_name",
"exporter_group_parent",
"port_of_export_label",
"port_of_export_name",
"country_of_destination",
"economic_bloc",
"importer_label",
"importer_name",
"importer_group",
]
).agg(
[
pl.sum("mass_tonnes").alias("mass_tonnes"),
pl.sum("fob").alias("fob"),
]
)
# Final select and sort
trade_lf = trade_lf.select(
[
"year",
"producing_country",
"hs6",
"mass_tonnes",
"fob",
"exporter_label",
"exporter_name",
"exporter_node_id",
"exporter_group_name",
"exporter_group_parent",
"port_of_export_label",
"port_of_export_name",
"country_of_destination",
"economic_bloc",
"importer_label",
"importer_name",
"importer_group",
]
).sort(
by=["producing_country", "hs6", "mass_tonnes"],
descending=[False, False, True],
)
# If debugging a test run
# trade_lf.collect().write_parquet(f"~/Trase/data/diet_trase_coffee_trade_consolidated_{YEAR}.parquet")
return trade_lf