Skip to content

Diet Trase Coffee Trade Consolidated 2020

s3://trase-storage/diet-trase/diet_trase_coffee_trade_consolidated_2020.parquet

Dbt path: trase_production.main.diet_trase_coffee_trade_consolidated_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/diet_trase/_schema.yml

Model file link: trase/data_pipeline/models/diet_trase/diet_trase_coffee_trade_consolidated_2020.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: coffee, diet-trase-coffee


diet_trase_coffee_trade_consolidated_2020

Description

Brings together the coffee trade data of the countries we have data from. Runs cleaning, including: * Add an economic_bloc field, where if the destination country is part of the EU, it adds the "EUROPEAN UNION" value, and if it isn't, it uses the normal destination coutnry name. * Including standard names of exporters, importers, ports

Countries included: * Brazil * Colombia * Cote D'Ivoire * Ethiopia * India * Indonesia * Peru * Tanzania * Uganda * Vietnam


Details

Column Type Description
year INTEGER
producing_country VARCHAR
hs6 VARCHAR
mass_tonnes DOUBLE
fob DOUBLE
exporter_label VARCHAR
exporter_name VARCHAR
exporter_node_id BIGINT
exporter_group_name VARCHAR
exporter_group_parent VARCHAR
port_of_export_label VARCHAR
port_of_export_name VARCHAR
country_of_destination VARCHAR
economic_bloc VARCHAR
importer_label VARCHAR
importer_name VARCHAR
importer_group VARCHAR

Models / Seeds

  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.postgres_traders
  • model.trase_duckdb.postgres_ports
  • model.trase_duckdb.diet_trase_coffee_trader_parents
  • model.trase_duckdb.gold_brazil_bol_coffee_2020
  • model.trase_duckdb.gold_colombia_cd_coffee_2020
  • model.trase_duckdb.gold_cote_divoire_coffee_2020
  • model.trase_duckdb.gold_ethiopia_coffee_2020
  • model.trase_duckdb.gold_india_trade_coffee_2020
  • model.trase_duckdb.gold_indonesia_bol_coffee_2020
  • model.trase_duckdb.gold_peru_cd_coffee_2020
  • model.trase_duckdb.gold_tanzania_trade_coffee_2020
  • model.trase_duckdb.gold_uganda_coffee_2020
  • model.trase_duckdb.gold_vietnam_trade_coffee_2020

No called script or script source not found.

"""
Diet Trase consolidated coffee trade
This script brings together the coffee trade data from several countries, as well as
adds `economic_bloc` field.
Countries included:
* Brazil
* Colombia
* Cote D'Ivoire
* Ethiopia
* India
* Indonesia
* Peru
* Tanzania
* Uganda
* Vietnam
"""

import polars as pl

YEAR = 2020

COUNTRY_NAMES = [
    "BRAZIL",
    "COLOMBIA",
    "COTE D'IVOIRE",
    "ETHIOPIA",
    "INDIA",
    "INDONESIA",
    "PERU",
    "TANZANIA",
    "UGANDA",
    "VIETNAM",
]

TRADE_COLS_TO_SELECT = [
    "year",
    "exporter_label",
    "exporter_node_id",
    "country_of_destination",
    "mass_tonnes",
    "fob",
    "importer_label",
    "port_of_export_label",
    "hs6",
]


def select_relevant_columns(lf: pl.LazyFrame) -> pl.LazyFrame:
    return lf.select(TRADE_COLS_TO_SELECT)


def clean_strings(trade_lf, columns):
    # Trim, normalize, uppercase, remove extra spaces
    trade_lf = trade_lf.with_columns(
        [
            pl.col(col).str.strip_chars()
            # Replace multiple spaces with a single space
            .str.replace(r"\s+", " ")
            # Convert accents and diacritics (see https://stackoverflow.com/a/77217563)
            .str.normalize("NFKD")
            .str.replace_all(r"\p{CombiningMark}", "")
            .str.to_uppercase()
            .alias(col)
            for col in columns
        ]
    )
    return trade_lf


def identify_active_group_name(df, year, group_column):
    """
    Takes a dataframe with field containing the groups information as available
    in the trader reference dataset (a list of json strings), and adds a new
    'group_name' field with the valid group name for the given year.
    """

    # Add a row index to be able to group by it later
    df = df.with_row_index("row_index")

    df_exploded = df.explode(group_column)
    # Casting as polars might identify the field as binary
    df_exploded = df_exploded.with_columns(
        pl.col(group_column).cast(pl.Utf8).alias(group_column)
    )

    # Turn the json string into a struct with the appropriate types
    dtype = pl.Struct(
        [
            pl.Field("group", pl.Utf8),
            pl.Field("time_start", pl.Datetime),
            pl.Field("time_end", pl.Datetime),
        ]
    )
    df_exploded = df_exploded.with_columns(
        group_struct=pl.col(group_column).str.json_decode(dtype)
    )

    # Flatten the struct into new fields
    df_exploded = df_exploded.with_columns(
        group_name=pl.col("group_struct").struct.field("group"),
        time_start=pl.col("group_struct").struct.field("time_start"),
        time_end=pl.col("group_struct").struct.field("time_end"),
    )

    # Only keep groups that are valid for the year
    df_exploded = df_exploded.filter(
        (pl.col("time_end").is_null()) | (pl.col("time_end").dt.year() > pl.lit(year))
    )

    # Group again by the index, and only keep a group name
    df_unexploded = df_exploded.group_by("row_index").agg(
        pl.max("group_name").alias("group_name")
    )

    result = df.join(df_unexploded, on="row_index", how="left").drop("row_index")

    return result


def add_european_union_bloc(trade_lf, countries_lf):
    """
    Adds a 'economic_bloc_name' column with "EUROPEAN UNION" for countries currently making part of it.
    For those who are not part, then it replicates there the 'country_of_destination' value
    """
    economic_blocs_lf = countries_lf.select("country_name", "economic_bloc")
    economic_blocs_lf = economic_blocs_lf.explode("economic_bloc")

    # Turn the json string into a struct with the appropriate types
    dtype = pl.Struct(
        [
            pl.Field("economic_bloc", pl.Utf8),
            pl.Field("time_start", pl.Datetime),
            pl.Field("time_end", pl.Datetime),
        ]
    )
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
    )

    # Flatten the struct into new fields
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
        time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
        time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
    )

    # Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
    economic_blocs_lf = (
        economic_blocs_lf.filter(
            pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
        )
        .filter(pl.col("time_start").dt.year() <= pl.lit(YEAR))
        .filter(
            (pl.col("time_end").is_null())
            | (pl.col("time_end").dt.year() > pl.lit(YEAR))
        )
    )
    economic_blocs_lf = economic_blocs_lf.select("country_name", "economic_bloc_name")
    economic_blocs_lf = economic_blocs_lf.rename(
        {"economic_bloc_name": "economic_bloc"}
    )

    trade_lf = trade_lf.join(
        economic_blocs_lf,
        how="left",
        left_on="country_of_destination",
        right_on="country_name",
        validate="m:1",
    )

    # When the country is not part of EUROPEAN UNION, just use the 'country_of_destination'
    trade_lf = trade_lf.with_columns(
        economic_bloc=pl.when(~pl.col("economic_bloc").is_not_null())
        .then(pl.col("country_of_destination"))
        .otherwise(pl.col("economic_bloc"))
    )

    return trade_lf


def add_trader_data_by_label(
    input_trade_data_lf, db_traders_lf, trader_label_colummn, country_iso2=None
):
    """
    Adds trader information to the input trade data based on trader labels.

    This function takes a DataFrame containing trade data and adds trader information
    from a reference DataFrame. It matches trader labels in the input data with labels
    in the reference data and returns the best match including trader name, trader_trase_id,
    and trader group. The function also prioritizes traders based on the provided
    country ISO2 code and filters for active groups based on the given year.

    Parameters:
    - input_trade_data_lf (DataFrame): The input trade data containing trader labels.
    - db_traders_lf (DataFrame): The reference DataFrame containing trader information.
    - trader_label_colummn (str): The column name in the input trade data that contains trader labels.
    - country_iso2 (str, optional): The ISO2 code of the country to prioritize traders from. Defaults to None.

    Returns:
    - DataFrame: The input trade data with added trader information.
    """

    # Take the reference trader information, and unnest the labels
    db_traders_lf = db_traders_lf.select(
        "labels", "name", "trader_node_id", "trase_id", "group_name"
    )
    db_traders_lf = db_traders_lf.explode("labels")

    # Temporarily rename the trader_label_column to 'trader_label', and only work with it
    input_trade_data_lf = input_trade_data_lf.rename(
        {trader_label_colummn: "trader_label"}
    )
    input_labels_lf = input_trade_data_lf.select("trader_label").unique()

    joined_with_label_data = input_labels_lf.join(
        db_traders_lf, left_on="trader_label", right_on="labels", how="left"
    )

    # Add a country_priority field to prioritize the country's traders
    joined_with_label_data = joined_with_label_data.with_columns(
        country_priority=pl.when(
            (pl.lit(country_iso2).is_not_null())
            & (pl.col("trase_id").str.slice(0, 2) == pl.lit(country_iso2))
        )
        .then(pl.lit(1))
        .otherwise(pl.lit(0))
    )

    # Aggregate, giving priority to the country's traders, and only taking one value
    grouped_label_data = (
        joined_with_label_data.group_by("trader_label").agg(
            [pl.all().sort_by("country_priority", descending=True).head(1)]
        )
    ).drop("country_priority")

    # Disaggregate as we're only taking one matching record, and improve names
    grouped_label_data = grouped_label_data.explode(
        ["name", "trader_node_id", "trase_id", "group_name"]
    )
    grouped_label_data = grouped_label_data.rename(
        {
            "name": "trader_name",
            "trase_id": "trader_trase_id",
            "group_name": "trader_group_name",
        }
    )

    # Add the results to the original input_trade_data_lf
    input_trade_data_lf = input_trade_data_lf.join(
        grouped_label_data,
        how="left",
        on="trader_label",
    )

    # Return the original trader_label_column name
    input_trade_data_lf = input_trade_data_lf.rename(
        {"trader_label": trader_label_colummn}
    )

    return input_trade_data_lf


def add_exporter_name_and_ownership(trade_lf, reference_traders_lf, group_parents_lf):
    """
    Takes the 'exporter_node_id', and adds the exporter_name, the exporter_group_name,
    and the parent company of the company group (if any) as 'exporter_group_parent'
    """

    # Add exporter_name and exporter_group_name from the reference traders dataset
    exporter_names_lf = reference_traders_lf.select(
        ["trader_node_id", "name", "group_name"]
    ).unique()
    exporter_names_lf = exporter_names_lf.rename(
        {
            "trader_node_id": "exporter_node_id",
            "name": "exporter_name",
            "group_name": "exporter_group_name",
        }
    )
    trade_lf = trade_lf.join(
        exporter_names_lf,
        how="left",
        on="exporter_node_id",
        validate="m:1",
    )

    # Take traders without a match, and pass them through add_trader_data_by_label
    missing_exporter_names_lf = trade_lf.filter(pl.col("exporter_name").is_null())
    missing_exporter_names_lf = missing_exporter_names_lf.select(
        ["exporter_label"]
    ).unique()
    missing_exporter_names_lf = add_trader_data_by_label(
        missing_exporter_names_lf, reference_traders_lf, "exporter_label"
    )
    missing_exporter_names_lf = missing_exporter_names_lf.rename(
        {
            "trader_name": "missing_exporter_name",
            "trader_node_id": "missing_exporter_node_id",
            "trader_group_name": "missing_exporter_group_name",
        }
    )

    # Add the results back to trade_lf
    trade_lf = trade_lf.join(
        missing_exporter_names_lf,
        how="left",
        on="exporter_label",
        validate="m:1",
    )

    # Coalesce the missing fields into the main columns
    trade_lf = trade_lf.with_columns(
        exporter_name=pl.coalesce(
            [pl.col("exporter_name"), pl.col("missing_exporter_name")]
        ),
        exporter_node_id=pl.coalesce(
            [pl.col("exporter_node_id"), pl.col("missing_exporter_node_id")]
        ),
        exporter_group_name=pl.coalesce(
            [pl.col("exporter_group_name"), pl.col("missing_exporter_group_name")]
        ),
    ).drop(
        [
            "missing_exporter_name",
            "missing_exporter_node_id",
            "missing_exporter_group_name",
        ]
    )

    # Add the parent company of exporter groups that have them
    trade_lf = trade_lf.join(
        group_parents_lf,
        how="left",
        left_on="exporter_group_name",
        right_on="db_group_name",
        validate="m:1",
    )

    return trade_lf


def clean_importers(trade_lf, reference_traders_lf):
    """
    Takes the 'importer_label', and cleans it against the official trader database.
    Adds the 'importer_name' and 'importer_group' fields
    """

    # Replace NUL, 'A LA ORDEN', 'TO THE ORDER%' importer_label with 'UNKNOWN'
    trade_lf = trade_lf.with_columns(
        importer_label=pl.when(
            (pl.col("importer_label").is_null())
            | (pl.col("importer_label").str.to_uppercase().str.contains("A LA ORDEN"))
            | (pl.col("importer_label").str.to_uppercase().str.contains("TO THE ORDER"))
            | (pl.col("importer_label").str.to_uppercase().str.contains("NOT DECLARED"))
            | (pl.col("importer_label").str.to_uppercase().str.contains("TO ORDER"))
        )
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_label"))
    )

    # take the distinct importer labels, excluding 'UNKNOWN CUSTOMER' as it has multiple matches
    importer_labels_lf = trade_lf.select("importer_label").unique()
    importer_labels_lf = importer_labels_lf.filter(
        pl.col("importer_label").ne("UNKNOWN CUSTOMER")
    )

    # Get the trader data for the importer labels, and merge it to trade_lf
    importer_labels_lf = add_trader_data_by_label(
        importer_labels_lf, reference_traders_lf, "importer_label"
    )
    importer_labels_lf = importer_labels_lf.rename(
        {
            "trader_name": "importer_name",
            "trader_group_name": "importer_group",
        }
    )

    trade_lf = trade_lf.join(
        importer_labels_lf,
        how="left",
        on="importer_label",
    )

    # Add manually the name and group for the 'UNKNOWN CUSTOMER' importer
    trade_lf = trade_lf.with_columns(
        importer_name=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_name")),
        importer_group=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_group")),
    )

    # Dropping 'trader_trase_id' as for importers it can be confusing to include a country identifier
    trade_lf = trade_lf.drop("trader_trase_id")

    # Replacing NULLs with 'UNKNOWN'
    trade_lf = trade_lf.with_columns(
        importer_name=pl.when(pl.col("importer_name").is_null())
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_name")),
        importer_group=pl.when(pl.col("importer_group").is_null())
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_group")),
    )

    return trade_lf


def clean_ports(bol_lf, ports_lf):
    """
    Creates the 'port_of_export_name' column.
    Cleans the 'port_of_export_label'columns against the synonyms
    in the ports reference table (currently a view from postgres views.regions created through
    dbt model 'postgres_ports')
    """

    ports_lf = ports_lf.select("name", "synonyms", "country").unique()

    bol_lf = bol_lf.with_columns(
        pl.col("port_of_export_label").alias("port_of_export_name")
    )

    for country in COUNTRY_NAMES:

        country_ports_lf = ports_lf.filter(pl.col("country") == pl.lit(country))

        # Manually remove database duplicates (based on synonym) for Indonesia,
        # keeping the ones present in Mark's port files
        if country == "INDONESIA":
            country_ports_lf = country_ports_lf.filter(
                ~pl.col("name").is_in(
                    [
                        "TANJUNG EMAS",
                        "MEDAN",
                        "JAMBI",
                        "UJUNG PANDANG / HASANUDDIN (U)",
                        "BATU AMPAR",
                    ]
                )
            )

        country_ports_lf = country_ports_lf.rename(
            {
                "name": "matched_port_of_export",
                "synonyms": "country_port_synonyms",
            }
        )
        country_ports_lf = country_ports_lf.drop(["country"]).unique()
        country_ports_lf = country_ports_lf.explode("country_port_synonyms").unique()

        bol_lf = bol_lf.join(
            country_ports_lf,
            how="left",
            left_on="port_of_export_label",
            right_on="country_port_synonyms",
            validate="m:1",
        )

        bol_lf = bol_lf.with_columns(
            port_of_export_name=pl.when(pl.col("matched_port_of_export").is_not_null())
            .then(pl.col("matched_port_of_export"))
            .otherwise(pl.col("port_of_export_name"))
        ).drop("matched_port_of_export")

    # Replace nulls in port_of_export_name with 'UNKNOWN'
    bol_lf = bol_lf.with_columns(
        port_of_export_name=pl.when(pl.col("port_of_export_name").is_null())
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("port_of_export_name")),
        port_of_export_label=pl.when(pl.col("port_of_export_label").is_null())
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("port_of_export_label")),
    )

    return bol_lf


def model(dbt, cursor):
    dbt.config(materialized="external")

    # get country list including economic bloc, and official trader list
    countries_lf = dbt.ref("postgres_countries").pl().lazy()
    reference_traders_lf = dbt.ref("postgres_traders").pl().lazy()
    ports_lf = dbt.ref("postgres_ports").pl().lazy()
    # Add 'group_name' field to traders_lf with the active group
    reference_traders_lf = identify_active_group_name(
        reference_traders_lf, YEAR, "groups"
    ).drop("groups")

    group_parents_lf = dbt.ref("diet_trase_coffee_trader_parents").pl().lazy()
    group_parents_lf = (
        group_parents_lf.filter(
            (
                (pl.col("group_parent_start_date").dt.year() <= pl.lit(YEAR))
                | (pl.col("group_parent_start_date").is_null())
            )
            & (
                (pl.col("group_parent_end_date").dt.year() > pl.lit(YEAR))
                | (pl.col("group_parent_end_date").is_null())
            )
        )
        .select(["db_group_name", "group_parent"])
        .unique()
    )
    group_parents_lf = group_parents_lf.rename(
        {"group_parent": "exporter_group_parent"}
    )
    # get trade data from each country
    brazil_trade_lf = select_relevant_columns(
        dbt.ref("gold_brazil_bol_coffee_2020").pl().lazy()
    )
    colombia_trade_lf = select_relevant_columns(
        dbt.ref("gold_colombia_cd_coffee_2020").pl().lazy()
    )
    cote_divoire_trade_lf = select_relevant_columns(
        dbt.ref("gold_cote_divoire_coffee_2020").pl().lazy()
    )
    ethiopia_trade_lf = select_relevant_columns(
        dbt.ref("gold_ethiopia_coffee_2020").pl().lazy()
    )
    india_trade_lf = select_relevant_columns(
        dbt.ref("gold_india_trade_coffee_2020").pl().lazy()
    )
    indonesia_trade_lf = select_relevant_columns(
        dbt.ref("gold_indonesia_bol_coffee_2020").pl().lazy()
    )
    peru_trade_lf = select_relevant_columns(
        dbt.ref("gold_peru_cd_coffee_2020").pl().lazy()
    )
    tanzania_trade_lf = select_relevant_columns(
        dbt.ref("gold_tanzania_trade_coffee_2020").pl().lazy()
    )
    uganda_trade_lf = select_relevant_columns(
        dbt.ref("gold_uganda_coffee_2020").pl().lazy()
    )
    vietnam_trade_lf = select_relevant_columns(
        dbt.ref("gold_vietnam_trade_coffee_2020").pl().lazy()
    )

    # Add a 'producing_country' field in the dataframes
    country_lazy_frames = [
        brazil_trade_lf,
        colombia_trade_lf,
        cote_divoire_trade_lf,
        ethiopia_trade_lf,
        india_trade_lf,
        indonesia_trade_lf,
        peru_trade_lf,
        tanzania_trade_lf,
        uganda_trade_lf,
        vietnam_trade_lf,
    ]
    countries_dict = {
        country_name: lazy_frame
        for country_name, lazy_frame in zip(COUNTRY_NAMES, country_lazy_frames)
    }
    for country_name, lazy_frame in countries_dict.items():
        lazy_frame = lazy_frame.with_columns(
            pl.lit(country_name).alias("producing_country"),
            pl.col("exporter_node_id").cast(pl.Int64).alias("exporter_node_id"),
        )
        countries_dict[country_name] = lazy_frame

    trade_lf = pl.concat(countries_dict.values())

    # Run additional pre-processing
    cols_to_clean = ["exporter_label", "port_of_export_label", "importer_label"]
    trade_lf = clean_strings(trade_lf, columns=cols_to_clean)
    trade_lf = add_european_union_bloc(trade_lf, countries_lf)
    trade_lf = add_exporter_name_and_ownership(
        trade_lf, reference_traders_lf, group_parents_lf
    )
    trade_lf = clean_importers(trade_lf, reference_traders_lf)
    trade_lf = clean_ports(trade_lf, ports_lf)

    # Remove 'UNKNOWN COUNTRY' (a record from PERU)
    trade_lf = trade_lf.filter(pl.col("country_of_destination") != "UNKNOWN COUNTRY")

    # consolidate all, summing mass_tonnes and fob
    trade_lf = trade_lf.group_by(
        [
            "year",
            "producing_country",
            "hs6",
            "exporter_label",
            "exporter_name",
            "exporter_node_id",
            "exporter_group_name",
            "exporter_group_parent",
            "port_of_export_label",
            "port_of_export_name",
            "country_of_destination",
            "economic_bloc",
            "importer_label",
            "importer_name",
            "importer_group",
        ]
    ).agg(
        [
            pl.sum("mass_tonnes").alias("mass_tonnes"),
            pl.sum("fob").alias("fob"),
        ]
    )

    # Final select and sort
    trade_lf = trade_lf.select(
        [
            "year",
            "producing_country",
            "hs6",
            "mass_tonnes",
            "fob",
            "exporter_label",
            "exporter_name",
            "exporter_node_id",
            "exporter_group_name",
            "exporter_group_parent",
            "port_of_export_label",
            "port_of_export_name",
            "country_of_destination",
            "economic_bloc",
            "importer_label",
            "importer_name",
            "importer_group",
        ]
    ).sort(
        by=["producing_country", "hs6", "mass_tonnes"],
        descending=[False, False, True],
    )

    # If debugging a test run
    # trade_lf.collect().write_parquet(f"~/Trase/data/diet_trase_coffee_trade_consolidated_{YEAR}.parquet")

    return trade_lf