Pre Gold Cote Divoire Coffee 2020
s3://trase-storage/cote_divoire/coffee/trade/bol/pre_gold_cote_divoire_coffee-2020.parquet
Dbt path: trase_production.main_cote_divoire_coffee.pre_gold_cote_divoire_coffee_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/cote_divoire/coffee/trade/_schema_cote_divoire_coffee.yml
Model file link: trase/data_pipeline/models/cote_divoire/coffee/trade/pre_gold_cote_divoire_coffee_2020.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: pre_gold, cote_divoire, coffee, trade, 2020, diet-trase-coffee
pre_gold_cote_divoire_coffee_2020
Description
Loads the silver table and ....
Details
| Column | Type | Description |
|---|---|---|
year |
INTEGER |
|
exporter_label |
VARCHAR |
|
country_of_destination |
VARCHAR |
|
mass_tonnes |
DOUBLE |
|
fob |
DOUBLE |
|
importer_label |
VARCHAR |
|
port_of_export_label |
VARCHAR |
|
hs6 |
VARCHAR |
|
DATE |
TIMESTAMP WITH TIME ZONE |
|
DIRECTION |
VARCHAR |
|
EXPORTER_CODE |
VARCHAR |
|
EXPORTER |
VARCHAR |
|
EXPORTER_ADDRESS |
VARCHAR |
|
BUYER |
VARCHAR |
|
BUYER_ADDRESS |
VARCHAR |
|
PROVENANCE_COUNTRY |
VARCHAR |
|
COUNTRY_DESTINATION |
VARCHAR |
|
ORIGIN_COUNTRY |
VARCHAR |
|
HS_CODE_DESCRIPTION |
VARCHAR |
|
UNIT_CODE |
VARCHAR |
|
UNIT |
VARCHAR |
|
NUMBER_OF_PRODUCTS |
DOUBLE |
|
QUANTITY |
DOUBLE |
|
NET_WEIGHT_KG |
DOUBLE |
|
GROSS_WEIGHT_KG |
DOUBLE |
|
CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC |
DOUBLE |
|
CIF_VALUE_IN_USD |
DOUBLE |
|
PORT_CODE |
VARCHAR |
|
PORT_NAME |
VARCHAR |
|
TRANSPORT_MODE_CODE |
VARCHAR |
|
TRANSPORT_MODE |
VARCHAR |
|
MONTH |
VARCHAR |
|
Chapter |
VARCHAR |
|
Heading |
VARCHAR |
|
HS_CODE |
VARCHAR |
|
trader_node_id |
BIGINT |
|
exporter_group_name |
VARCHAR |
Models / Seeds
model.trase_duckdb.silver_cote_divoire_coffee_2020model.trase_duckdb.postgres_countriesmodel.trase_duckdb.postgres_traders
No called script or script source not found.
# pre_gold_cote_divoire_coffee_2020
import polars as pl
def model(dbt, session):
dbt.config(materialized="external")
silver_trade_data = dbt.ref("silver_cote_divoire_coffee_2020")
# Load relevant data and register it in a Polars Lazy Dataframe
# Start by the fields that will be used in the gold file, even if duplicating them
exports_df = (
session.execute(
f"""
SELECT
YEAR::INT AS year,
UPPER(EXPORTER) AS exporter_label,
COUNTRY_DESTINATION AS country_of_destination,
NET_WEIGHT_KG/1000 AS mass_tonnes,
CIF_VALUE_IN_USD AS fob,
UPPER(BUYER) AS importer_label,
UPPER(PORT_NAME) AS port_of_export_label,
HS_CODE AS hs6,
DATE,
DIRECTION,
EXPORTER_CODE,
EXPORTER,
EXPORTER_ADDRESS,
BUYER,
BUYER_ADDRESS,
PROVENANCE_COUNTRY,
COUNTRY_DESTINATION,
ORIGIN_COUNTRY,
HS_CODE_DESCRIPTION,
UNIT_CODE,
UNIT,
NUMBER_OF_PRODUCTS,
QUANTITY,
NET_WEIGHT_KG,
GROSS_WEIGHT_KG,
CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC,
CIF_VALUE_IN_USD,
PORT_CODE,
PORT_NAME,
TRANSPORT_MODE_CODE,
TRANSPORT_MODE,
MONTH,
Chapter,
Heading,
HS_CODE
FROM main_cote_divoire_coffee.silver_cote_divoire_coffee_2020
WHERE
(hs6 LIKE '0901%' OR hs6 LIKE '2101%')
"""
)
.pl()
.lazy()
)
### Clean country names
# Build a dictionary with official country synonyms
countries_df = dbt.ref("postgres_countries").pl()
exploded_df = countries_df.explode("synonyms")
synonym_country_dict = dict(
zip(exploded_df["synonyms"].to_list(), exploded_df["country_name"].to_list())
)
# Add missing country synonyms
synonym_country_dict.update(
{
"CAMEROUN": "CAMEROON",
"EQUATEUR": "ECUADOR",
"GAMBIE": "GAMBIA",
"GUINEE": "GUINEA",
"GUINEE EQUATORIALE": "GUINEA",
"GUINEE-BISSAU": "GUINEA-BISSAU",
"MAURICE, ILE": "MAURITIUS",
"SLOVENIE": "SLOVENIA",
}
)
# Replace country synonyms with the official country names
exports_df = exports_df.with_columns(
pl.col("country_of_destination")
.replace_strict(synonym_country_dict)
.alias("country_of_destination"),
pl.col("exporter_label").str.strip_chars().alias("exporter_label"),
)
# Remove records going to 'UNKNOWN COUNTRY'
exports_df = exports_df.filter(
pl.col("country_of_destination") != "UNKNOWN COUNTRY"
)
### Get exporter groups - this could be moved to a function
db_trader_data = dbt.ref("postgres_traders").pl().lazy()
# Filter the trader data to only include exporters from the exports data
# This improves performance a tiny bit, and makes it easier to have many to one join down below
exporters = exports_df.select(pl.col("exporter_label")).unique().collect()
exporter_list = exporters["exporter_label"].to_list()
existing_trader_info = db_trader_data.select(
[
pl.col("name"),
pl.col("trader_node_id"),
pl.col("trase_id"),
pl.col("labels"),
pl.col("groups"),
]
).filter(
# Ensure we're working only with lists that match the exporter list
pl.col("labels")
.list.eval(pl.element().is_in(exporter_list))
.list.any()
)
# Unnested filtered trader info so that we can join it with the exports data
unnested_trader_info = existing_trader_info.explode("labels").rename(
{"labels": "label"}
)
exports_joined_with_trader_info = exports_df.join(
unnested_trader_info,
left_on="exporter_label",
right_on="label",
how="left",
validate="m:1", # many to one
)
exports_df = exports_joined_with_trader_info.select(
[pl.col("*"), pl.col("groups").alias("exporter_group_name")]
)
exports_df = exports_df.drop(["name", "trase_id", "groups"])
# Extract 'group' from the first JSON object in 'exporter_group'
exports_df = exports_df.with_columns(
pl.col("exporter_group_name")
.list.first()
.str.json_path_match("$.group")
.alias("exporter_group_name")
)
return exports_df.collect()