Skip to content

Diet Trase Coffee Trade Padded 2020

s3://trase-storage/diet-trase/diet_trase_coffee_trade_padded_2020.parquet

Dbt path: trase_production.main.diet_trase_coffee_trade_padded_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/diet_trase/_schema.yml

Model file link: trase/data_pipeline/models/diet_trase/diet_trase_coffee_trade_padded_2020.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: coffee, diet-trase, padded, 2020


diet_trase_coffee_trade_padded_2020

Description

Diet Trase Coffee 2020 COMTRADE padding

Explore in Metabase here.

Adds missing trade data based on COMTRADE values, and adds the 'mass_tonnes_raw_equivalent'.

Depending on the completeness of trade data, this is done at the following levels: a) No padding. Marked as false in the field padded. b) For all trade hs6 codes of a producing country. Marked with country_hs6_pad in the field padded_type. c) For hs6 code/country of destionation of a producing country. Marked partial_pad_hs6_prod_and_dest_countries in the field padded_type. d) For all trade data of a producing country. Marked all_country_data in the field padded_type.

a) No padding. Countries with no padding: * Brazil * Colombia * Côte d’Ivoire * Ethiopia * India * Indonesia * Peru

b) Countries to pad full hs6 codes Vietnam: * pad 210111, 210112 [, 090190?]

c) Countries to pad hs6 code/country of destination Tanzania exports: * 090111 - pad exports to Uganda * 090190 - pad exports to Uganda, Australia, Rwanda Uganda exports: of 090111, pad exports to: * ITALY * SUDAN * GERMANY * INDIA * SPAIN * UNITED STATES * BELGIUM * MOROCCO * RUSSIAN FEDERATION * ISRAEL * SLOVENIA * PORTUGAL * CHINA (MAINLAND) * SWEDEN * GREECE * POLAND * SOUTH KOREA * NETHERLANDS * ROMANIA * FINLAND * SOUTH AFRICA * MEXICO * CROATIA * ALGERIA

d) Countries to pad all trade data Producing countries found in 'world/production/FAO/20_12_2024_release/coffee_prod_2020.csv' (58 countries), EXCEPT for the ones we have trading data of (10 countries)


Details

Column Type Description
year INTEGER
producing_country VARCHAR
hs6 VARCHAR
mass_tonnes DOUBLE
mass_tonnes_raw_equivalent DOUBLE
fob DOUBLE
exporter_label VARCHAR
exporter_name VARCHAR
exporter_node_id BIGINT
exporter_group_name VARCHAR
exporter_group_parent VARCHAR
port_of_export_label VARCHAR
port_of_export_name VARCHAR
country_of_destination VARCHAR
economic_bloc VARCHAR
importer_label VARCHAR
importer_name VARCHAR
importer_group VARCHAR
padded BOOLEAN Boolean indicating whether the record is padded or not, by taking values from COMTRADE.
padded_type VARCHAR Type of padding applied to the record,

Models / Seeds

  • source.trase_duckdb.source_world.original_fao_coffee_production_2020
  • model.trase_duckdb.diet_trase_coffee_trade_consolidated_2020
  • model.trase_duckdb.comtrade_exports_year_exporter_hs6_importer
  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.postgres_commodities

Sources

  • ['source_world', 'original_fao_coffee_production_2020']

No called script or script source not found.

"""
Diet Trase COMTRADE padding

This script fills incomplete data for selected coffee producing countries based on COMTRADE data.

Depending on the completeness of trade data, this is done at the following levels:
a) For all trade hs6 codes of a producing country
b) For hs6 code/country of destionation of a producing country
c) For all trade data of a producing country

a) Countries to pad full hs6 codes
Vietnam:
* pad 210111, 210112 [, 090190?]

b) Countries to pad hs6 code/country of destination
Tanzania exports: 
* 090111 - pad exports to Uganda (10 t)
* 090190 - pad exports to Uganda 3449-43, Australia 19.46, Rwanda 194.65
Uganda exports: of 090111, pad exports to: 
* ITALY (ITA)
* SUDAN (SDN)
* GERMANY (DEU)
* INDIA (IND)
* SPAIN (ESP)
* UNITED STATES (USA)
* BELGIUM (BEL)
* MOROCCO (MAR)
* RUSSIAN FEDERATION (RUS)
* ISRAEL (ISR)
* SLOVENIA (SVN)
* PORTUGAL (PRT)
* CHINA (MAINLAND) (CHN)
* SWEDEN (SWE)
* GREECE (GRC)
* POLAND (POL)
* SOUTH KOREA (KOR)
* NETHERLANDS (NLD)
* ROMANIA (ROU)
* FINLAND (FIN)
* SOUTH AFRICA (ZAF)
* MEXICO (MEX)
* CROATIA (HRV)
* ALGERIA (DZA)

c) Countries to pad all trade data
Producing countries found in 'world/production/FAO/20_12_2024_release/coffee_prod_2020.csv' (58 countries), 
EXCEPT for the ones we have trading data of (10 countries)

"""

import polars as pl

YEAR = 2020

BOL_COUNTRIES_ISO3 = {
    "BRAZIL": "BRA",
    "COLOMBIA": "COL",
    "COTE D'IVOIRE": "CIV",
    "ETHIOPIA": "ETH",
    "INDIA": "IND",
    "INDONESIA": "IDN",
    "PERU": "PER",
    "TANZANIA": "TZA",
    "UGANDA": "UGA",
    "VIETNAM": "VNM",
}

COFFEE_HS6_CODES = [
    "090111",  # Coffee, not roasted, not decaffeinated
    "090112",  # Coffee, not roasted, decaffeinated
    "090121",  # Coffee, roasted, not decaffeinated
    "090122",  # Coffee, roasted, decaffeinated
    "090190",  # Coffee, roasted, not decaffeinated
    "210111",  # Extracts, essences and concentrates, coffee
    "210112",  # Coffee extracts, essences and concentrates
]

# countries to fully pad hs6 codes (Vietnam, for codes 210111, 210112)
COUNTRY_HS6_PAD_DICT = {"VNM": ["210111", "210112"]}

NODE_ID_FOR_UNKNOWN = 15221616

# countries to partially pad based on hs6 code/country of destination (Tanzania, Uganda)
PARTIAL_PAD_COUNTRY_HS6_PAD_DICT = {
    "TZA": {"090111": ["UGA"], "090190": ["UGA", "AUS", "RWA"]},
    "UGA": {
        "090111": [
            "ITA",
            "SDN",
            "DEU",
            "IND",
            "ESP",
            "USA",
            "BEL",
            "MAR",
            "RUS",
            "ISR",
            "SVN",
            "PRT",
            "CHN",
            "SWE",
            "GRC",
            "POL",
            "KOR",
            "NLD",
            "ROU",
            "FIN",
            "ZAF",
            "MEX",
            "HRV",
            "DZA",
        ]
    },
}


def add_european_union_bloc(trade_lf, countries_lf):
    """
    Adds a 'economic_bloc_name' column with "EUROPEAN UNION" for countries currently making part of it.
    For those who are not part, then it replicates there the 'country_of_destination' value
    """
    economic_blocs_lf = countries_lf.select("country_name", "economic_bloc")
    economic_blocs_lf = economic_blocs_lf.explode("economic_bloc")

    # Turn the json string into a struct with the appropriate types
    dtype = pl.Struct(
        [
            pl.Field("economic_bloc", pl.Utf8),
            pl.Field("time_start", pl.Datetime),
            pl.Field("time_end", pl.Datetime),
        ]
    )
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
    )

    # Flatten the struct into new fields
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
        time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
        time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
    )

    # Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
    economic_blocs_lf = (
        economic_blocs_lf.filter(
            pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
        )
        .filter(pl.col("time_start").dt.year() <= pl.lit(YEAR))
        .filter(
            (pl.col("time_end").is_null())
            | (pl.col("time_end").dt.year() > pl.lit(YEAR))
        )
    )
    economic_blocs_lf = economic_blocs_lf.select("country_name", "economic_bloc_name")
    economic_blocs_lf = economic_blocs_lf.rename(
        {"economic_bloc_name": "economic_bloc"}
    )

    trade_lf = trade_lf.join(
        economic_blocs_lf,
        how="left",
        left_on="country_of_destination",
        right_on="country_name",
        validate="m:1",
    )

    # When the country is not part of EUROPEAN UNION, just use the 'country_of_destination'
    trade_lf = trade_lf.with_columns(
        economic_bloc=pl.when(~pl.col("economic_bloc").is_not_null())
        .then(pl.col("country_of_destination"))
        .otherwise(pl.col("economic_bloc"))
    )

    return trade_lf


def pad_hs6_countries(comtrade_lf, countries_lf, country_hs6_dict):
    """
    Returns a lazyframe with the data of the countries/hs6 codes to fully pad, which
    would then be appended to the bol_data. This assumes that the countries don't
    have any trade data for the hs6 codes.
    Receives a lazyframe with the comtrade data and a dictionary with the iso country
    codes as keys and a list with hs6 codes to pad as values.
    """
    countries_lf = countries_lf.select("iso_alpha_3", "country_name").unique()
    # Create separate lists for countries and hs6 codes
    countries_to_pad = []
    hs6_codes_to_pad = []
    for country, codes in country_hs6_dict.items():
        for code in codes:
            countries_to_pad.append(country)
            hs6_codes_to_pad.append(code)

    # Create a lazyframe from the lists
    country_hs6_lf = pl.DataFrame(
        {"country_of_export_iso": countries_to_pad, "commodity_code": hs6_codes_to_pad}
    ).lazy()

    # Perform a semi join with the comtrade data
    padded_country_hs6_lf = comtrade_lf.join(
        country_hs6_lf, on=["country_of_export_iso", "commodity_code"], how="semi"
    )

    padded_country_hs6_lf = padded_country_hs6_lf.rename({"commodity_code": "hs6"})

    padded_country_hs6_lf = padded_country_hs6_lf.with_columns(
        mass_tonnes=pl.when(
            (pl.col("net_weight_kg").is_not_null()) & (pl.col("net_weight_kg") > 0)
        )
        .then(pl.col("net_weight_kg") / 1000)
        .otherwise(pl.col("alternative_quantity") / 1000)
    ).drop("net_weight_kg", "alternative_quantity")

    # Add countries_of_export and country_of_import based on the iso3
    padded_country_hs6_lf = (
        padded_country_hs6_lf.join(
            countries_lf,
            left_on="country_of_export_iso",
            right_on="iso_alpha_3",
            how="left",
            validate="m:1",
        )
        .rename({"country_name": "producing_country"})
        .drop("country_of_export_iso")
    )
    padded_country_hs6_lf = (
        padded_country_hs6_lf.join(
            countries_lf,
            left_on="country_of_import_iso",
            right_on="iso_alpha_3",
            how="left",
            validate="m:1",
        )
        .rename({"country_name": "country_of_destination"})
        .drop("country_of_import_iso")
    )

    # Add year; exporters and importers as 'UNKNOWN'; and type of pad
    padded_country_hs6_lf = padded_country_hs6_lf.with_columns(
        year=pl.lit(YEAR),
        exporter_label=pl.lit("UNKNOWN"),
        exporter_name=pl.lit("UNKNOWN"),
        exporter_node_id=pl.lit(NODE_ID_FOR_UNKNOWN),
        exporter_group_name=pl.lit("UNKNOWN"),
        exporter_group_parent=pl.lit(None),
        port_of_export_label=pl.lit("UNKNOWN"),
        port_of_export_name=pl.lit("UNKNOWN"),
        importer_label=pl.lit("UNKNOWN"),
        importer_name=pl.lit("UNKNOWN"),
        importer_group=pl.lit("UNKNOWN"),
        padded=pl.lit(True),
        padded_type=pl.lit("country_hs6_pad"),
    )

    return padded_country_hs6_lf


def partial_pad_hs6_countries(
    bol_lf, comtrade_lf, countries_lf, partial_pad_country_hs6_pad_dict
):
    """
    Returns a lazyframe with the padded data of the origin/destination countries/hs6 codes,
    filling the missing weight and fob values based on the comtrade data.
    """
    countries_lf = countries_lf.select("iso_alpha_3", "country_name").unique()

    # Create a lazyframe from the dictionary
    prod_countries_to_pad = []
    hs6_codes_to_pad = []
    dest_countries_to_pad = []
    for country, hs6_dict in partial_pad_country_hs6_pad_dict.items():
        for hs6, dest_countries in hs6_dict.items():
            for dest_country in dest_countries:
                prod_countries_to_pad.append(country)
                hs6_codes_to_pad.append(hs6)
                dest_countries_to_pad.append(dest_country)
    partial_pad_country_hs6_pad_lf = pl.DataFrame(
        {
            "producing_country_iso": prod_countries_to_pad,
            "hs6": hs6_codes_to_pad,
            "country_of_destination_iso": dest_countries_to_pad,
        }
    ).lazy()

    # Filter comtrade based on the partial pad selection
    comtrade_lf = comtrade_lf.join(
        partial_pad_country_hs6_pad_lf,
        left_on=["country_of_export_iso", "commodity_code", "country_of_import_iso"],
        right_on=["producing_country_iso", "hs6", "country_of_destination_iso"],
        how="semi",
    ).rename(
        {
            "commodity_code": "hs6",
            "net_weight_kg": "comtrade_net_weight_kg",
            "alternative_quantity": "comtrade_alternative_quantity",
            "fob": "comtrade_fob",
        }
    )

    # Create comtrade_mass_tonnes
    comtrade_lf = comtrade_lf.with_columns(
        comtrade_mass_tonnes=pl.when(
            (pl.col("comtrade_net_weight_kg").is_not_null())
            & (pl.col("comtrade_net_weight_kg") > 0)
        )
        .then(pl.col("comtrade_net_weight_kg") / 1000)
        .otherwise(pl.col("comtrade_alternative_quantity") / 1000)
    ).drop("comtrade_net_weight_kg", "comtrade_alternative_quantity")

    # Take relevant information from bol_lf
    # aggregating mass_tonnes and fob based on producing_country, hs6 and country_of_destination
    aggregated_bol_lf = (
        bol_lf.select(
            "producing_country", "hs6", "country_of_destination", "mass_tonnes", "fob"
        )
        .group_by(["producing_country", "hs6", "country_of_destination"])
        .agg(mass_tonnes=pl.col("mass_tonnes").sum(), fob=pl.col("fob").sum())
        .rename({"mass_tonnes": "bol_mass_tonnes", "fob": "bol_fob"})
    )

    # Add the country iso's to bol_lf. The full country names will be added again later
    aggregated_bol_lf = (
        aggregated_bol_lf.join(
            countries_lf,
            left_on="producing_country",
            right_on="country_name",
            how="left",
            validate="m:1",
        )
        .rename({"iso_alpha_3": "producing_country_iso"})
        .drop("producing_country")
    )
    aggregated_bol_lf = (
        aggregated_bol_lf.join(
            countries_lf,
            left_on="country_of_destination",
            right_on="country_name",
            how="left",
            validate="m:1",
        )
        .rename({"iso_alpha_3": "country_of_destination_iso"})
        .drop("country_of_destination")
    )

    # Add the bol_lf data to the comtrade_lf data
    comtrade_lf = comtrade_lf.join(
        aggregated_bol_lf,
        left_on=["country_of_export_iso", "hs6", "country_of_import_iso"],
        right_on=["producing_country_iso", "hs6", "country_of_destination_iso"],
        how="left",
        validate="m:1",
    )

    # Create mass_tonnes and fob, based on comtrade - bol
    comtrade_lf = comtrade_lf.with_columns(
        fob=pl.col("comtrade_fob") - pl.coalesce("bol_fob", 0),
        mass_tonnes=pl.col("comtrade_mass_tonnes") - pl.coalesce("bol_mass_tonnes", 0),
    ).drop("comtrade_mass_tonnes", "comtrade_fob", "bol_mass_tonnes", "bol_fob")

    # Add producing_country and country_of_destination
    comtrade_lf = (
        comtrade_lf.join(
            countries_lf,
            left_on="country_of_export_iso",
            right_on="iso_alpha_3",
            how="left",
            validate="m:1",
        )
        .rename({"country_name": "producing_country"})
        .drop("country_of_export_iso")
    )
    comtrade_lf = (
        comtrade_lf.join(
            countries_lf,
            left_on="country_of_import_iso",
            right_on="iso_alpha_3",
            how="left",
            validate="m:1",
        )
        .rename({"country_name": "country_of_destination"})
        .drop("country_of_import_iso")
    )

    # Add year; exporters and importers as 'UNKNOWN'; and type of pad
    comtrade_lf = comtrade_lf.with_columns(
        year=pl.lit(YEAR),
        exporter_label=pl.lit("UNKNOWN"),
        exporter_name=pl.lit("UNKNOWN"),
        exporter_node_id=pl.lit(NODE_ID_FOR_UNKNOWN),
        exporter_group_name=pl.lit("UNKNOWN"),
        exporter_group_parent=pl.lit(None),
        port_of_export_label=pl.lit("UNKNOWN"),
        port_of_export_name=pl.lit("UNKNOWN"),
        importer_label=pl.lit("UNKNOWN"),
        importer_name=pl.lit("UNKNOWN"),
        importer_group=pl.lit("UNKNOWN"),
        padded=pl.lit(True),
        padded_type=pl.lit("partial_pad_hs6_prod_and_dest_countries"),
    )

    return comtrade_lf


def pad_all_trade_data(no_trade_data_countries_lf, comtrade_lf, countries_lf):
    """
    Receives
    - A lazyframe with the iso3 of the countries to pad
    - A lazyframe with the comtrade data
    - A lazyframe with the countries iso and name data
    Returns a lazyframe with the padded data.
    """
    countries_lf = countries_lf.select("iso_alpha_3", "country_name").unique()

    padded_countries_lf = comtrade_lf.join(
        no_trade_data_countries_lf,
        left_on="country_of_export_iso",
        right_on="iso_alpha_3",
        how="inner",
    ).select(
        "country_of_export_iso",
        "country_of_import_iso",
        "commodity_code",
        "net_weight_kg",
        "alternative_quantity",
        "fob",
    )

    padded_countries_lf = padded_countries_lf.rename({"commodity_code": "hs6"})
    # In a couple of cases, the weight is in alternative_quantity
    padded_countries_lf = padded_countries_lf.with_columns(
        mass_tonnes=pl.when(
            (pl.col("net_weight_kg").is_not_null()) & (pl.col("net_weight_kg") > 0)
        )
        .then(pl.col("net_weight_kg") / 1000)
        .otherwise(pl.col("alternative_quantity") / 1000)
    ).drop("net_weight_kg", "alternative_quantity")

    padded_countries_lf = (
        padded_countries_lf.join(
            countries_lf,
            left_on="country_of_export_iso",
            right_on="iso_alpha_3",
            how="left",
        )
        .rename({"country_name": "producing_country"})
        .drop("country_of_export_iso")
    )

    padded_countries_lf = (
        padded_countries_lf.join(
            countries_lf,
            left_on="country_of_import_iso",
            right_on="iso_alpha_3",
            how="left",
        )
        .rename({"country_name": "country_of_destination"})
        .drop("country_of_import_iso")
    )

    # Add year; exporters and importers as 'UNKNOWN', and type of pad
    padded_countries_lf = padded_countries_lf.with_columns(
        year=pl.lit(YEAR),
        exporter_label=pl.lit("UNKNOWN"),
        exporter_name=pl.lit("UNKNOWN"),
        exporter_node_id=pl.lit(NODE_ID_FOR_UNKNOWN),
        exporter_group_name=pl.lit("UNKNOWN"),
        exporter_group_parent=pl.lit(None),
        port_of_export_label=pl.lit("UNKNOWN"),
        port_of_export_name=pl.lit("UNKNOWN"),
        importer_label=pl.lit("UNKNOWN"),
        importer_name=pl.lit("UNKNOWN"),
        importer_group=pl.lit("UNKNOWN"),
        padded=pl.lit(True),
        padded_type=pl.lit("all_country_data"),
    )

    return padded_countries_lf


def model(dbt, cursor):
    dbt.config(materialized="external")

    # get consolidated trade data for which we have vendor data
    bol_lf = dbt.ref("diet_trase_coffee_trade_consolidated_2020").pl().lazy()
    # get comtrade data
    comtrade_lf = dbt.ref("comtrade_exports_year_exporter_hs6_importer").pl().lazy()
    comtrade_lf = comtrade_lf.filter(pl.col("year") == YEAR).select(
        "country_of_export_iso",
        "country_of_import_iso",
        "commodity_code",
        "net_weight_kg",
        "alternative_quantity",
        "fob",
    )
    comtrade_lf = comtrade_lf.filter(
        pl.col("commodity_code").is_in(COFFEE_HS6_CODES)
    ).unique()

    # get countries iso data
    # drop "GAZA STRIP (PALESTINE)" and keep "OCCUPIED PALESTINIAN TERRITORY" as its
    # more standard in trade, and we need to avoid a duplicated 'PSE' iso3, and 299 fao_code
    countries_lf = dbt.ref("postgres_countries").pl().lazy()
    countries_lf = (
        countries_lf.select("iso_alpha_3", "fao_code", "country_name", "economic_bloc")
        .filter(
            (pl.col("iso_alpha_3").is_not_null())
            & (pl.col("country_name") != "GAZA STRIP (PALESTINE)")
        )
        .unique()
    )

    # iso3 of FAO's producing countries
    fao_producing_countries_lf = (
        dbt.source("source_world", "original_fao_coffee_production_2020").pl().lazy()
    )
    fao_producing_countries_lf = (
        fao_producing_countries_lf.select("AreaCode")
        .with_columns(pl.col("AreaCode").cast(pl.Int32))
        .unique()
        .rename({"AreaCode": "fao_code"})
    )
    fao_producing_countries_lf = (
        fao_producing_countries_lf.join(
            countries_lf, left_on="fao_code", right_on="fao_code", how="inner"
        )
        .select("iso_alpha_3")
        .unique()
    )

    # Get commodity equivalence factors
    commodities_lf = dbt.ref("postgres_commodities").pl().lazy()
    commodities_lf = (
        commodities_lf.filter(
            (pl.col("commodity") == "COFFEE") & (pl.col("location") == "WORLD")
        )
        .select("hs_code", "eq_factor")
        .unique()
    )

    # filter comtrade for producing countries
    comtrade_lf = comtrade_lf.join(
        fao_producing_countries_lf,
        left_on="country_of_export_iso",
        right_on="iso_alpha_3",
        how="semi",
    ).unique()

    # filter comtrade for valid importer countries (it includes some additional codes not used)
    comtrade_lf = comtrade_lf.join(
        countries_lf,
        left_on="country_of_import_iso",
        right_on="iso_alpha_3",
        how="semi",
    ).unique()

    # producing countries for which we have no trade data
    no_trade_data_countries_lf = fao_producing_countries_lf.filter(
        ~pl.col("iso_alpha_3").is_in(BOL_COUNTRIES_ISO3.values())
    )

    # Generate the records for countries with no trade data
    padded_countries_lf = pad_all_trade_data(
        no_trade_data_countries_lf, comtrade_lf, countries_lf
    )

    # Generate the records for countries missing hs6 codes (Vietnam, for codes 210111, 210112)
    padded_countries_hs6_codes = pad_hs6_countries(
        comtrade_lf, countries_lf, COUNTRY_HS6_PAD_DICT
    )

    # Generate the records for countries missing hs6 codes and country of destination (Tanzania, Uganda)
    padded_countries_partial_pad = partial_pad_hs6_countries(
        bol_lf, comtrade_lf, countries_lf, PARTIAL_PAD_COUNTRY_HS6_PAD_DICT
    )

    # Combine all padded data
    padded_countries_lf = pl.concat(
        [padded_countries_lf, padded_countries_hs6_codes, padded_countries_partial_pad],
    )

    # Add the economic bloc for the padded countries
    padded_countries_lf = add_european_union_bloc(padded_countries_lf, countries_lf)

    padded_countries_lf = padded_countries_lf.select(
        "year",
        "producing_country",
        "hs6",
        "mass_tonnes",
        "fob",
        "exporter_label",
        "exporter_name",
        "exporter_node_id",
        "exporter_group_name",
        "exporter_group_parent",
        "port_of_export_label",
        "port_of_export_name",
        "country_of_destination",
        "economic_bloc",
        "importer_label",
        "importer_name",
        "importer_group",
        "padded",
        "padded_type",
    )
    padded_countries_lf = padded_countries_lf.with_columns(
        pl.col("exporter_node_id").cast(pl.Int64).alias("exporter_node_id"),
    )

    # Add the padded and padded_type columns to the bol_lf
    bol_lf = bol_lf.with_columns(padded=pl.lit(False), padded_type=pl.lit(""))

    # Concatenate bol with padded
    bol_lf = pl.concat([bol_lf, padded_countries_lf])

    # Add mass_tonnes_raw_equivalent
    bol_lf = (
        bol_lf.join(
            commodities_lf,
            left_on="hs6",
            right_on="hs_code",
            how="left",
            validate="m:1",
        )
        .with_columns(
            mass_tonnes_raw_equivalent=pl.col("mass_tonnes") * pl.col("eq_factor")
        )
        .drop("eq_factor")
    )

    bol_lf = bol_lf.select(
        "year",
        "producing_country",
        "hs6",
        "mass_tonnes",
        "mass_tonnes_raw_equivalent",
        "fob",
        "exporter_label",
        "exporter_name",
        "exporter_node_id",
        "exporter_group_name",
        "exporter_group_parent",
        "port_of_export_label",
        "port_of_export_name",
        "country_of_destination",
        "economic_bloc",
        "importer_label",
        "importer_name",
        "importer_group",
        "padded",
        "padded_type",
    )

    # If debugging a test run
    # bol_lf.collect().write_parquet(f"~/Trase/data/diet_trase_coffee_trade_padded_{YEAR}.parquet")

    return bol_lf