Diet Trase Coffee Trade Padded 2020
s3://trase-storage/diet-trase/diet_trase_coffee_trade_padded_2020.parquet
Dbt path: trase_production.main.diet_trase_coffee_trade_padded_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/diet_trase/_schema.yml
Model file link: trase/data_pipeline/models/diet_trase/diet_trase_coffee_trade_padded_2020.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: coffee, diet-trase, padded, 2020
diet_trase_coffee_trade_padded_2020
Description
Diet Trase Coffee 2020 COMTRADE padding
Explore in Metabase here.
Adds missing trade data based on COMTRADE values, and adds the 'mass_tonnes_raw_equivalent'.
Depending on the completeness of trade data, this is done at the following levels:
a) No padding. Marked as false in the field padded.
b) For all trade hs6 codes of a producing country. Marked with country_hs6_pad in the field padded_type.
c) For hs6 code/country of destionation of a producing country. Marked partial_pad_hs6_prod_and_dest_countries in the field padded_type.
d) For all trade data of a producing country. Marked all_country_data in the field padded_type.
a) No padding. Countries with no padding: * Brazil * Colombia * Côte d’Ivoire * Ethiopia * India * Indonesia * Peru
b) Countries to pad full hs6 codes Vietnam: * pad 210111, 210112 [, 090190?]
c) Countries to pad hs6 code/country of destination Tanzania exports: * 090111 - pad exports to Uganda * 090190 - pad exports to Uganda, Australia, Rwanda Uganda exports: of 090111, pad exports to: * ITALY * SUDAN * GERMANY * INDIA * SPAIN * UNITED STATES * BELGIUM * MOROCCO * RUSSIAN FEDERATION * ISRAEL * SLOVENIA * PORTUGAL * CHINA (MAINLAND) * SWEDEN * GREECE * POLAND * SOUTH KOREA * NETHERLANDS * ROMANIA * FINLAND * SOUTH AFRICA * MEXICO * CROATIA * ALGERIA
d) Countries to pad all trade data Producing countries found in 'world/production/FAO/20_12_2024_release/coffee_prod_2020.csv' (58 countries), EXCEPT for the ones we have trading data of (10 countries)
Details
| Column | Type | Description |
|---|---|---|
year |
INTEGER |
|
producing_country |
VARCHAR |
|
hs6 |
VARCHAR |
|
mass_tonnes |
DOUBLE |
|
mass_tonnes_raw_equivalent |
DOUBLE |
|
fob |
DOUBLE |
|
exporter_label |
VARCHAR |
|
exporter_name |
VARCHAR |
|
exporter_node_id |
BIGINT |
|
exporter_group_name |
VARCHAR |
|
exporter_group_parent |
VARCHAR |
|
port_of_export_label |
VARCHAR |
|
port_of_export_name |
VARCHAR |
|
country_of_destination |
VARCHAR |
|
economic_bloc |
VARCHAR |
|
importer_label |
VARCHAR |
|
importer_name |
VARCHAR |
|
importer_group |
VARCHAR |
|
padded |
BOOLEAN |
Boolean indicating whether the record is padded or not, by taking values from COMTRADE. |
padded_type |
VARCHAR |
Type of padding applied to the record, |
Models / Seeds
source.trase_duckdb.source_world.original_fao_coffee_production_2020model.trase_duckdb.diet_trase_coffee_trade_consolidated_2020model.trase_duckdb.comtrade_exports_year_exporter_hs6_importermodel.trase_duckdb.postgres_countriesmodel.trase_duckdb.postgres_commodities
Sources
['source_world', 'original_fao_coffee_production_2020']
No called script or script source not found.
"""
Diet Trase COMTRADE padding
This script fills incomplete data for selected coffee producing countries based on COMTRADE data.
Depending on the completeness of trade data, this is done at the following levels:
a) For all trade hs6 codes of a producing country
b) For hs6 code/country of destionation of a producing country
c) For all trade data of a producing country
a) Countries to pad full hs6 codes
Vietnam:
* pad 210111, 210112 [, 090190?]
b) Countries to pad hs6 code/country of destination
Tanzania exports:
* 090111 - pad exports to Uganda (10 t)
* 090190 - pad exports to Uganda 3449-43, Australia 19.46, Rwanda 194.65
Uganda exports: of 090111, pad exports to:
* ITALY (ITA)
* SUDAN (SDN)
* GERMANY (DEU)
* INDIA (IND)
* SPAIN (ESP)
* UNITED STATES (USA)
* BELGIUM (BEL)
* MOROCCO (MAR)
* RUSSIAN FEDERATION (RUS)
* ISRAEL (ISR)
* SLOVENIA (SVN)
* PORTUGAL (PRT)
* CHINA (MAINLAND) (CHN)
* SWEDEN (SWE)
* GREECE (GRC)
* POLAND (POL)
* SOUTH KOREA (KOR)
* NETHERLANDS (NLD)
* ROMANIA (ROU)
* FINLAND (FIN)
* SOUTH AFRICA (ZAF)
* MEXICO (MEX)
* CROATIA (HRV)
* ALGERIA (DZA)
c) Countries to pad all trade data
Producing countries found in 'world/production/FAO/20_12_2024_release/coffee_prod_2020.csv' (58 countries),
EXCEPT for the ones we have trading data of (10 countries)
"""
import polars as pl
YEAR = 2020
BOL_COUNTRIES_ISO3 = {
"BRAZIL": "BRA",
"COLOMBIA": "COL",
"COTE D'IVOIRE": "CIV",
"ETHIOPIA": "ETH",
"INDIA": "IND",
"INDONESIA": "IDN",
"PERU": "PER",
"TANZANIA": "TZA",
"UGANDA": "UGA",
"VIETNAM": "VNM",
}
COFFEE_HS6_CODES = [
"090111", # Coffee, not roasted, not decaffeinated
"090112", # Coffee, not roasted, decaffeinated
"090121", # Coffee, roasted, not decaffeinated
"090122", # Coffee, roasted, decaffeinated
"090190", # Coffee, roasted, not decaffeinated
"210111", # Extracts, essences and concentrates, coffee
"210112", # Coffee extracts, essences and concentrates
]
# countries to fully pad hs6 codes (Vietnam, for codes 210111, 210112)
COUNTRY_HS6_PAD_DICT = {"VNM": ["210111", "210112"]}
NODE_ID_FOR_UNKNOWN = 15221616
# countries to partially pad based on hs6 code/country of destination (Tanzania, Uganda)
PARTIAL_PAD_COUNTRY_HS6_PAD_DICT = {
"TZA": {"090111": ["UGA"], "090190": ["UGA", "AUS", "RWA"]},
"UGA": {
"090111": [
"ITA",
"SDN",
"DEU",
"IND",
"ESP",
"USA",
"BEL",
"MAR",
"RUS",
"ISR",
"SVN",
"PRT",
"CHN",
"SWE",
"GRC",
"POL",
"KOR",
"NLD",
"ROU",
"FIN",
"ZAF",
"MEX",
"HRV",
"DZA",
]
},
}
def add_european_union_bloc(trade_lf, countries_lf):
"""
Adds a 'economic_bloc_name' column with "EUROPEAN UNION" for countries currently making part of it.
For those who are not part, then it replicates there the 'country_of_destination' value
"""
economic_blocs_lf = countries_lf.select("country_name", "economic_bloc")
economic_blocs_lf = economic_blocs_lf.explode("economic_bloc")
# Turn the json string into a struct with the appropriate types
dtype = pl.Struct(
[
pl.Field("economic_bloc", pl.Utf8),
pl.Field("time_start", pl.Datetime),
pl.Field("time_end", pl.Datetime),
]
)
economic_blocs_lf = economic_blocs_lf.with_columns(
economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
)
# Flatten the struct into new fields
economic_blocs_lf = economic_blocs_lf.with_columns(
economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
)
# Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
economic_blocs_lf = (
economic_blocs_lf.filter(
pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
)
.filter(pl.col("time_start").dt.year() <= pl.lit(YEAR))
.filter(
(pl.col("time_end").is_null())
| (pl.col("time_end").dt.year() > pl.lit(YEAR))
)
)
economic_blocs_lf = economic_blocs_lf.select("country_name", "economic_bloc_name")
economic_blocs_lf = economic_blocs_lf.rename(
{"economic_bloc_name": "economic_bloc"}
)
trade_lf = trade_lf.join(
economic_blocs_lf,
how="left",
left_on="country_of_destination",
right_on="country_name",
validate="m:1",
)
# When the country is not part of EUROPEAN UNION, just use the 'country_of_destination'
trade_lf = trade_lf.with_columns(
economic_bloc=pl.when(~pl.col("economic_bloc").is_not_null())
.then(pl.col("country_of_destination"))
.otherwise(pl.col("economic_bloc"))
)
return trade_lf
def pad_hs6_countries(comtrade_lf, countries_lf, country_hs6_dict):
"""
Returns a lazyframe with the data of the countries/hs6 codes to fully pad, which
would then be appended to the bol_data. This assumes that the countries don't
have any trade data for the hs6 codes.
Receives a lazyframe with the comtrade data and a dictionary with the iso country
codes as keys and a list with hs6 codes to pad as values.
"""
countries_lf = countries_lf.select("iso_alpha_3", "country_name").unique()
# Create separate lists for countries and hs6 codes
countries_to_pad = []
hs6_codes_to_pad = []
for country, codes in country_hs6_dict.items():
for code in codes:
countries_to_pad.append(country)
hs6_codes_to_pad.append(code)
# Create a lazyframe from the lists
country_hs6_lf = pl.DataFrame(
{"country_of_export_iso": countries_to_pad, "commodity_code": hs6_codes_to_pad}
).lazy()
# Perform a semi join with the comtrade data
padded_country_hs6_lf = comtrade_lf.join(
country_hs6_lf, on=["country_of_export_iso", "commodity_code"], how="semi"
)
padded_country_hs6_lf = padded_country_hs6_lf.rename({"commodity_code": "hs6"})
padded_country_hs6_lf = padded_country_hs6_lf.with_columns(
mass_tonnes=pl.when(
(pl.col("net_weight_kg").is_not_null()) & (pl.col("net_weight_kg") > 0)
)
.then(pl.col("net_weight_kg") / 1000)
.otherwise(pl.col("alternative_quantity") / 1000)
).drop("net_weight_kg", "alternative_quantity")
# Add countries_of_export and country_of_import based on the iso3
padded_country_hs6_lf = (
padded_country_hs6_lf.join(
countries_lf,
left_on="country_of_export_iso",
right_on="iso_alpha_3",
how="left",
validate="m:1",
)
.rename({"country_name": "producing_country"})
.drop("country_of_export_iso")
)
padded_country_hs6_lf = (
padded_country_hs6_lf.join(
countries_lf,
left_on="country_of_import_iso",
right_on="iso_alpha_3",
how="left",
validate="m:1",
)
.rename({"country_name": "country_of_destination"})
.drop("country_of_import_iso")
)
# Add year; exporters and importers as 'UNKNOWN'; and type of pad
padded_country_hs6_lf = padded_country_hs6_lf.with_columns(
year=pl.lit(YEAR),
exporter_label=pl.lit("UNKNOWN"),
exporter_name=pl.lit("UNKNOWN"),
exporter_node_id=pl.lit(NODE_ID_FOR_UNKNOWN),
exporter_group_name=pl.lit("UNKNOWN"),
exporter_group_parent=pl.lit(None),
port_of_export_label=pl.lit("UNKNOWN"),
port_of_export_name=pl.lit("UNKNOWN"),
importer_label=pl.lit("UNKNOWN"),
importer_name=pl.lit("UNKNOWN"),
importer_group=pl.lit("UNKNOWN"),
padded=pl.lit(True),
padded_type=pl.lit("country_hs6_pad"),
)
return padded_country_hs6_lf
def partial_pad_hs6_countries(
bol_lf, comtrade_lf, countries_lf, partial_pad_country_hs6_pad_dict
):
"""
Returns a lazyframe with the padded data of the origin/destination countries/hs6 codes,
filling the missing weight and fob values based on the comtrade data.
"""
countries_lf = countries_lf.select("iso_alpha_3", "country_name").unique()
# Create a lazyframe from the dictionary
prod_countries_to_pad = []
hs6_codes_to_pad = []
dest_countries_to_pad = []
for country, hs6_dict in partial_pad_country_hs6_pad_dict.items():
for hs6, dest_countries in hs6_dict.items():
for dest_country in dest_countries:
prod_countries_to_pad.append(country)
hs6_codes_to_pad.append(hs6)
dest_countries_to_pad.append(dest_country)
partial_pad_country_hs6_pad_lf = pl.DataFrame(
{
"producing_country_iso": prod_countries_to_pad,
"hs6": hs6_codes_to_pad,
"country_of_destination_iso": dest_countries_to_pad,
}
).lazy()
# Filter comtrade based on the partial pad selection
comtrade_lf = comtrade_lf.join(
partial_pad_country_hs6_pad_lf,
left_on=["country_of_export_iso", "commodity_code", "country_of_import_iso"],
right_on=["producing_country_iso", "hs6", "country_of_destination_iso"],
how="semi",
).rename(
{
"commodity_code": "hs6",
"net_weight_kg": "comtrade_net_weight_kg",
"alternative_quantity": "comtrade_alternative_quantity",
"fob": "comtrade_fob",
}
)
# Create comtrade_mass_tonnes
comtrade_lf = comtrade_lf.with_columns(
comtrade_mass_tonnes=pl.when(
(pl.col("comtrade_net_weight_kg").is_not_null())
& (pl.col("comtrade_net_weight_kg") > 0)
)
.then(pl.col("comtrade_net_weight_kg") / 1000)
.otherwise(pl.col("comtrade_alternative_quantity") / 1000)
).drop("comtrade_net_weight_kg", "comtrade_alternative_quantity")
# Take relevant information from bol_lf
# aggregating mass_tonnes and fob based on producing_country, hs6 and country_of_destination
aggregated_bol_lf = (
bol_lf.select(
"producing_country", "hs6", "country_of_destination", "mass_tonnes", "fob"
)
.group_by(["producing_country", "hs6", "country_of_destination"])
.agg(mass_tonnes=pl.col("mass_tonnes").sum(), fob=pl.col("fob").sum())
.rename({"mass_tonnes": "bol_mass_tonnes", "fob": "bol_fob"})
)
# Add the country iso's to bol_lf. The full country names will be added again later
aggregated_bol_lf = (
aggregated_bol_lf.join(
countries_lf,
left_on="producing_country",
right_on="country_name",
how="left",
validate="m:1",
)
.rename({"iso_alpha_3": "producing_country_iso"})
.drop("producing_country")
)
aggregated_bol_lf = (
aggregated_bol_lf.join(
countries_lf,
left_on="country_of_destination",
right_on="country_name",
how="left",
validate="m:1",
)
.rename({"iso_alpha_3": "country_of_destination_iso"})
.drop("country_of_destination")
)
# Add the bol_lf data to the comtrade_lf data
comtrade_lf = comtrade_lf.join(
aggregated_bol_lf,
left_on=["country_of_export_iso", "hs6", "country_of_import_iso"],
right_on=["producing_country_iso", "hs6", "country_of_destination_iso"],
how="left",
validate="m:1",
)
# Create mass_tonnes and fob, based on comtrade - bol
comtrade_lf = comtrade_lf.with_columns(
fob=pl.col("comtrade_fob") - pl.coalesce("bol_fob", 0),
mass_tonnes=pl.col("comtrade_mass_tonnes") - pl.coalesce("bol_mass_tonnes", 0),
).drop("comtrade_mass_tonnes", "comtrade_fob", "bol_mass_tonnes", "bol_fob")
# Add producing_country and country_of_destination
comtrade_lf = (
comtrade_lf.join(
countries_lf,
left_on="country_of_export_iso",
right_on="iso_alpha_3",
how="left",
validate="m:1",
)
.rename({"country_name": "producing_country"})
.drop("country_of_export_iso")
)
comtrade_lf = (
comtrade_lf.join(
countries_lf,
left_on="country_of_import_iso",
right_on="iso_alpha_3",
how="left",
validate="m:1",
)
.rename({"country_name": "country_of_destination"})
.drop("country_of_import_iso")
)
# Add year; exporters and importers as 'UNKNOWN'; and type of pad
comtrade_lf = comtrade_lf.with_columns(
year=pl.lit(YEAR),
exporter_label=pl.lit("UNKNOWN"),
exporter_name=pl.lit("UNKNOWN"),
exporter_node_id=pl.lit(NODE_ID_FOR_UNKNOWN),
exporter_group_name=pl.lit("UNKNOWN"),
exporter_group_parent=pl.lit(None),
port_of_export_label=pl.lit("UNKNOWN"),
port_of_export_name=pl.lit("UNKNOWN"),
importer_label=pl.lit("UNKNOWN"),
importer_name=pl.lit("UNKNOWN"),
importer_group=pl.lit("UNKNOWN"),
padded=pl.lit(True),
padded_type=pl.lit("partial_pad_hs6_prod_and_dest_countries"),
)
return comtrade_lf
def pad_all_trade_data(no_trade_data_countries_lf, comtrade_lf, countries_lf):
"""
Receives
- A lazyframe with the iso3 of the countries to pad
- A lazyframe with the comtrade data
- A lazyframe with the countries iso and name data
Returns a lazyframe with the padded data.
"""
countries_lf = countries_lf.select("iso_alpha_3", "country_name").unique()
padded_countries_lf = comtrade_lf.join(
no_trade_data_countries_lf,
left_on="country_of_export_iso",
right_on="iso_alpha_3",
how="inner",
).select(
"country_of_export_iso",
"country_of_import_iso",
"commodity_code",
"net_weight_kg",
"alternative_quantity",
"fob",
)
padded_countries_lf = padded_countries_lf.rename({"commodity_code": "hs6"})
# In a couple of cases, the weight is in alternative_quantity
padded_countries_lf = padded_countries_lf.with_columns(
mass_tonnes=pl.when(
(pl.col("net_weight_kg").is_not_null()) & (pl.col("net_weight_kg") > 0)
)
.then(pl.col("net_weight_kg") / 1000)
.otherwise(pl.col("alternative_quantity") / 1000)
).drop("net_weight_kg", "alternative_quantity")
padded_countries_lf = (
padded_countries_lf.join(
countries_lf,
left_on="country_of_export_iso",
right_on="iso_alpha_3",
how="left",
)
.rename({"country_name": "producing_country"})
.drop("country_of_export_iso")
)
padded_countries_lf = (
padded_countries_lf.join(
countries_lf,
left_on="country_of_import_iso",
right_on="iso_alpha_3",
how="left",
)
.rename({"country_name": "country_of_destination"})
.drop("country_of_import_iso")
)
# Add year; exporters and importers as 'UNKNOWN', and type of pad
padded_countries_lf = padded_countries_lf.with_columns(
year=pl.lit(YEAR),
exporter_label=pl.lit("UNKNOWN"),
exporter_name=pl.lit("UNKNOWN"),
exporter_node_id=pl.lit(NODE_ID_FOR_UNKNOWN),
exporter_group_name=pl.lit("UNKNOWN"),
exporter_group_parent=pl.lit(None),
port_of_export_label=pl.lit("UNKNOWN"),
port_of_export_name=pl.lit("UNKNOWN"),
importer_label=pl.lit("UNKNOWN"),
importer_name=pl.lit("UNKNOWN"),
importer_group=pl.lit("UNKNOWN"),
padded=pl.lit(True),
padded_type=pl.lit("all_country_data"),
)
return padded_countries_lf
def model(dbt, cursor):
dbt.config(materialized="external")
# get consolidated trade data for which we have vendor data
bol_lf = dbt.ref("diet_trase_coffee_trade_consolidated_2020").pl().lazy()
# get comtrade data
comtrade_lf = dbt.ref("comtrade_exports_year_exporter_hs6_importer").pl().lazy()
comtrade_lf = comtrade_lf.filter(pl.col("year") == YEAR).select(
"country_of_export_iso",
"country_of_import_iso",
"commodity_code",
"net_weight_kg",
"alternative_quantity",
"fob",
)
comtrade_lf = comtrade_lf.filter(
pl.col("commodity_code").is_in(COFFEE_HS6_CODES)
).unique()
# get countries iso data
# drop "GAZA STRIP (PALESTINE)" and keep "OCCUPIED PALESTINIAN TERRITORY" as its
# more standard in trade, and we need to avoid a duplicated 'PSE' iso3, and 299 fao_code
countries_lf = dbt.ref("postgres_countries").pl().lazy()
countries_lf = (
countries_lf.select("iso_alpha_3", "fao_code", "country_name", "economic_bloc")
.filter(
(pl.col("iso_alpha_3").is_not_null())
& (pl.col("country_name") != "GAZA STRIP (PALESTINE)")
)
.unique()
)
# iso3 of FAO's producing countries
fao_producing_countries_lf = (
dbt.source("source_world", "original_fao_coffee_production_2020").pl().lazy()
)
fao_producing_countries_lf = (
fao_producing_countries_lf.select("AreaCode")
.with_columns(pl.col("AreaCode").cast(pl.Int32))
.unique()
.rename({"AreaCode": "fao_code"})
)
fao_producing_countries_lf = (
fao_producing_countries_lf.join(
countries_lf, left_on="fao_code", right_on="fao_code", how="inner"
)
.select("iso_alpha_3")
.unique()
)
# Get commodity equivalence factors
commodities_lf = dbt.ref("postgres_commodities").pl().lazy()
commodities_lf = (
commodities_lf.filter(
(pl.col("commodity") == "COFFEE") & (pl.col("location") == "WORLD")
)
.select("hs_code", "eq_factor")
.unique()
)
# filter comtrade for producing countries
comtrade_lf = comtrade_lf.join(
fao_producing_countries_lf,
left_on="country_of_export_iso",
right_on="iso_alpha_3",
how="semi",
).unique()
# filter comtrade for valid importer countries (it includes some additional codes not used)
comtrade_lf = comtrade_lf.join(
countries_lf,
left_on="country_of_import_iso",
right_on="iso_alpha_3",
how="semi",
).unique()
# producing countries for which we have no trade data
no_trade_data_countries_lf = fao_producing_countries_lf.filter(
~pl.col("iso_alpha_3").is_in(BOL_COUNTRIES_ISO3.values())
)
# Generate the records for countries with no trade data
padded_countries_lf = pad_all_trade_data(
no_trade_data_countries_lf, comtrade_lf, countries_lf
)
# Generate the records for countries missing hs6 codes (Vietnam, for codes 210111, 210112)
padded_countries_hs6_codes = pad_hs6_countries(
comtrade_lf, countries_lf, COUNTRY_HS6_PAD_DICT
)
# Generate the records for countries missing hs6 codes and country of destination (Tanzania, Uganda)
padded_countries_partial_pad = partial_pad_hs6_countries(
bol_lf, comtrade_lf, countries_lf, PARTIAL_PAD_COUNTRY_HS6_PAD_DICT
)
# Combine all padded data
padded_countries_lf = pl.concat(
[padded_countries_lf, padded_countries_hs6_codes, padded_countries_partial_pad],
)
# Add the economic bloc for the padded countries
padded_countries_lf = add_european_union_bloc(padded_countries_lf, countries_lf)
padded_countries_lf = padded_countries_lf.select(
"year",
"producing_country",
"hs6",
"mass_tonnes",
"fob",
"exporter_label",
"exporter_name",
"exporter_node_id",
"exporter_group_name",
"exporter_group_parent",
"port_of_export_label",
"port_of_export_name",
"country_of_destination",
"economic_bloc",
"importer_label",
"importer_name",
"importer_group",
"padded",
"padded_type",
)
padded_countries_lf = padded_countries_lf.with_columns(
pl.col("exporter_node_id").cast(pl.Int64).alias("exporter_node_id"),
)
# Add the padded and padded_type columns to the bol_lf
bol_lf = bol_lf.with_columns(padded=pl.lit(False), padded_type=pl.lit(""))
# Concatenate bol with padded
bol_lf = pl.concat([bol_lf, padded_countries_lf])
# Add mass_tonnes_raw_equivalent
bol_lf = (
bol_lf.join(
commodities_lf,
left_on="hs6",
right_on="hs_code",
how="left",
validate="m:1",
)
.with_columns(
mass_tonnes_raw_equivalent=pl.col("mass_tonnes") * pl.col("eq_factor")
)
.drop("eq_factor")
)
bol_lf = bol_lf.select(
"year",
"producing_country",
"hs6",
"mass_tonnes",
"mass_tonnes_raw_equivalent",
"fob",
"exporter_label",
"exporter_name",
"exporter_node_id",
"exporter_group_name",
"exporter_group_parent",
"port_of_export_label",
"port_of_export_name",
"country_of_destination",
"economic_bloc",
"importer_label",
"importer_name",
"importer_group",
"padded",
"padded_type",
)
# If debugging a test run
# bol_lf.collect().write_parquet(f"~/Trase/data/diet_trase_coffee_trade_padded_{YEAR}.parquet")
return bol_lf