Skip to content

Pre Gold Cote Divoire Coffee 2020

s3://trase-storage/cote_divoire/coffee/trade/bol/pre_gold_cote_divoire_coffee-2020.parquet

Dbt path: trase_production.main_cote_divoire_coffee.pre_gold_cote_divoire_coffee_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/cote_divoire/coffee/trade/_schema_cote_divoire_coffee.yml

Model file link: trase/data_pipeline/models/cote_divoire/coffee/trade/pre_gold_cote_divoire_coffee_2020.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: pre_gold, cote_divoire, coffee, trade, 2020, diet-trase-coffee


pre_gold_cote_divoire_coffee_2020

Description

Loads the silver table and ....


Details

Column Type Description
year INTEGER
exporter_label VARCHAR
country_of_destination VARCHAR
mass_tonnes DOUBLE
fob DOUBLE
importer_label VARCHAR
port_of_export_label VARCHAR
hs6 VARCHAR
DATE TIMESTAMP WITH TIME ZONE
DIRECTION VARCHAR
EXPORTER_CODE VARCHAR
EXPORTER VARCHAR
EXPORTER_ADDRESS VARCHAR
BUYER VARCHAR
BUYER_ADDRESS VARCHAR
PROVENANCE_COUNTRY VARCHAR
COUNTRY_DESTINATION VARCHAR
ORIGIN_COUNTRY VARCHAR
HS_CODE_DESCRIPTION VARCHAR
UNIT_CODE VARCHAR
UNIT VARCHAR
NUMBER_OF_PRODUCTS DOUBLE
QUANTITY DOUBLE
NET_WEIGHT_KG DOUBLE
GROSS_WEIGHT_KG DOUBLE
CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC DOUBLE
CIF_VALUE_IN_USD DOUBLE
PORT_CODE VARCHAR
PORT_NAME VARCHAR
TRANSPORT_MODE_CODE VARCHAR
TRANSPORT_MODE VARCHAR
MONTH VARCHAR
Chapter VARCHAR
Heading VARCHAR
HS_CODE VARCHAR
trader_node_id BIGINT
exporter_group_name VARCHAR

Models / Seeds

  • model.trase_duckdb.silver_cote_divoire_coffee_2020
  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.postgres_traders

No called script or script source not found.

# pre_gold_cote_divoire_coffee_2020
import polars as pl


def model(dbt, session):
    dbt.config(materialized="external")

    silver_trade_data = dbt.ref("silver_cote_divoire_coffee_2020")
    # Load relevant data and register it in a Polars Lazy Dataframe
    # Start by the fields that will be used in the gold file, even if duplicating them
    exports_df = (
        session.execute(
            f"""
        SELECT
            YEAR::INT AS year,
            UPPER(EXPORTER) AS exporter_label,
            COUNTRY_DESTINATION AS country_of_destination,
            NET_WEIGHT_KG/1000 AS mass_tonnes,
            CIF_VALUE_IN_USD AS fob,
            UPPER(BUYER) AS importer_label,
            UPPER(PORT_NAME) AS port_of_export_label,
            HS_CODE AS hs6,
            DATE,
            DIRECTION,
            EXPORTER_CODE,
            EXPORTER,
            EXPORTER_ADDRESS,
            BUYER,
            BUYER_ADDRESS,
            PROVENANCE_COUNTRY,
            COUNTRY_DESTINATION,
            ORIGIN_COUNTRY,
            HS_CODE_DESCRIPTION,
            UNIT_CODE,
            UNIT,
            NUMBER_OF_PRODUCTS,
            QUANTITY,
            NET_WEIGHT_KG,
            GROSS_WEIGHT_KG,
            CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC,
            CIF_VALUE_IN_USD,
            PORT_CODE,
            PORT_NAME,
            TRANSPORT_MODE_CODE,
            TRANSPORT_MODE,
            MONTH,
            Chapter,
            Heading,
            HS_CODE
        FROM main_cote_divoire_coffee.silver_cote_divoire_coffee_2020
        WHERE
            (hs6 LIKE '0901%' OR hs6 LIKE '2101%')
        """
        )
        .pl()
        .lazy()
    )

    ### Clean country names

    # Build a dictionary with official country synonyms
    countries_df = dbt.ref("postgres_countries").pl()
    exploded_df = countries_df.explode("synonyms")
    synonym_country_dict = dict(
        zip(exploded_df["synonyms"].to_list(), exploded_df["country_name"].to_list())
    )

    # Add missing country synonyms
    synonym_country_dict.update(
        {
            "CAMEROUN": "CAMEROON",
            "EQUATEUR": "ECUADOR",
            "GAMBIE": "GAMBIA",
            "GUINEE": "GUINEA",
            "GUINEE EQUATORIALE": "GUINEA",
            "GUINEE-BISSAU": "GUINEA-BISSAU",
            "MAURICE, ILE": "MAURITIUS",
            "SLOVENIE": "SLOVENIA",
        }
    )

    # Replace country synonyms with the official country names
    exports_df = exports_df.with_columns(
        pl.col("country_of_destination")
        .replace_strict(synonym_country_dict)
        .alias("country_of_destination"),
        pl.col("exporter_label").str.strip_chars().alias("exporter_label"),
    )

    # Remove records going to 'UNKNOWN COUNTRY'
    exports_df = exports_df.filter(
        pl.col("country_of_destination") != "UNKNOWN COUNTRY"
    )

    ### Get exporter groups - this could be moved to a function
    db_trader_data = dbt.ref("postgres_traders").pl().lazy()

    # Filter the trader data to only include exporters from the exports data
    # This improves performance a tiny bit, and makes it easier to have many to one join down below
    exporters = exports_df.select(pl.col("exporter_label")).unique().collect()
    exporter_list = exporters["exporter_label"].to_list()
    existing_trader_info = db_trader_data.select(
        [
            pl.col("name"),
            pl.col("trader_node_id"),
            pl.col("trase_id"),
            pl.col("labels"),
            pl.col("groups"),
        ]
    ).filter(
        # Ensure we're working only with lists that match the exporter list
        pl.col("labels")
        .list.eval(pl.element().is_in(exporter_list))
        .list.any()
    )

    # Unnested filtered trader info so that we can join it with the exports data
    unnested_trader_info = existing_trader_info.explode("labels").rename(
        {"labels": "label"}
    )

    exports_joined_with_trader_info = exports_df.join(
        unnested_trader_info,
        left_on="exporter_label",
        right_on="label",
        how="left",
        validate="m:1",  # many to one
    )

    exports_df = exports_joined_with_trader_info.select(
        [pl.col("*"), pl.col("groups").alias("exporter_group_name")]
    )

    exports_df = exports_df.drop(["name", "trase_id", "groups"])

    # Extract 'group' from the first JSON object in 'exporter_group'
    exports_df = exports_df.with_columns(
        pl.col("exporter_group_name")
        .list.first()
        .str.json_path_match("$.group")
        .alias("exporter_group_name")
    )

    return exports_df.collect()