Silver Brazil Bol Coffee 2020
s3://trase-storage/brazil/coffee/trade/2020/silver/silver_brazil_bol_coffee_2020.parquet
Dbt path: trase_production.main_brazil_coffee.silver_brazil_bol_coffee_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/coffee/trade/_schema_brazil_coffee.yml
Model file link: trase/data_pipeline/models/brazil/coffee/trade/silver_brazil_bol_coffee_2020.py
Calls script: trase/data/brazil/coffee/trade/y2020/brazil_coffee_bol_2020.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: silver, brazil, coffee, trade, comtrade, 2020, diet-trase-coffee
silver_brazil_bol_coffee_2020
Description
This script loads the original BOL data for Brazil in 2020, filters it for coffee-related HS codes, and then finds the exporter information in the database based on the exporter tax number and exporter label. The country names from the original BOL data where already cleaned so there was no need to process them.
Calls the Python script: trase/data/brazil/coffee/trade/y2020/brazil_coffee_bol_2020.py
Details
| Column | Type | Description |
|---|---|---|
HS6 |
VARCHAR |
|
COUNTRY_OF_ORIGIN_LABEL |
VARCHAR |
|
EXPORTER_CNPJ |
VARCHAR |
|
EXPORTER_LABEL |
VARCHAR |
|
PORT_OF_EXPORT_LABEL |
VARCHAR |
|
PORT_OF_IMPORT_LABEL |
VARCHAR |
|
COUNTRY_OF_DESTINATION_LABEL |
VARCHAR |
|
IMPORTER_CITY |
VARCHAR |
|
IMPORTER_LABEL |
VARCHAR |
|
IMPORTER_COUNTRY_LABEL |
VARCHAR |
|
IMPORTER_CODE |
VARCHAR |
|
VOL |
DOUBLE |
|
HS4 |
VARCHAR |
|
HS5 |
VARCHAR |
|
YEAR |
BIGINT |
|
MONTH |
BIGINT |
|
DAY |
BIGINT |
|
EXPORTER_STATE_NAME |
VARCHAR |
|
EXPORTER_STATE_TRASE_ID |
VARCHAR |
|
EXPORTER_MUNICIPALITY_NAME |
VARCHAR |
|
EXPORTER_MUNICIPALITY_TRASE_ID |
VARCHAR |
|
COUNTRY_OF_DESTINATION_NAME |
VARCHAR |
|
COUNTRY_OF_DESTINATION_TRASE_ID |
VARCHAR |
|
PORT_OF_EXPORT_NAME |
VARCHAR |
|
EXPORTER_TYPE |
VARCHAR |
|
exporter_group_name |
VARCHAR |
|
exporter_group_id |
BIGINT |
|
exporter_trader_id |
BIGINT |
|
exporter_trase_id |
VARCHAR |
|
COUNTRY_OF_DESTINATION_ISO |
VARCHAR |
Models / Seeds
model.trase_duckdb.bronze_brazil_bol_coffee_2020
"""
This script loads the bronze BOL coffee data for Brazil in 2020,
finds the exporter information in the database based on the exporter tax number and exporter label. The country
names from the original BOL data where already cleaned so there was no need to process them.
The script is structured so it can be called directly or through dbt.
"""
from psycopg2 import sql
import pycountry
from trase.tools import get_country_id
from trase.tools import get_label_trader_id
from trase.tools import get_trader_group_id
from trase.tools.pandasdb.find import find_traders_and_groups_by_label
from trase.tools.pandasdb.find import find_traders_and_groups_by_trase_id
def get_exporter_data_from_database(bol_data):
"""Gets exporter existing information from the database based on EXPORTER_LABEL and EXPORTER_CNPJ"""
# Find the trader information - first the ones with tax number
country_id = get_country_id("BRAZIL")
# Concatenate 'BR-' to the first 8 characters of the exporter tax number
bol_data["trial_exporter_trase_id"] = bol_data["EXPORTER_CNPJ"].apply(
lambda x: "BR-TRADER-" + x[:8] if x is not None and len(x) >= 8 else None
)
# Get exporter related data from the database, based on trase_id
# Initialize the columns where data related to exporters will be
exporter_columns = [
"exporter_group_name",
"exporter_group_id",
"exporter_trader_id",
"count_trase_id",
]
for col in exporter_columns:
if col not in bol_data.columns:
bol_data[col] = None
# Search for the exporter information of records with a CNPJ
df_with_cnpj = bol_data[bol_data["trial_exporter_trase_id"].notnull()].copy()
df_with_cnpj[
[
"exporter_group_name",
"exporter_group_id",
"exporter_trader_id",
"count_trase_id",
]
] = find_traders_and_groups_by_trase_id(
df_with_cnpj.rename(
columns={"trial_exporter_trase_id": "trase_id"}, errors="raise"
),
returning=["group_name", "group_id", "trader_id", "count"],
year=sql.Literal(2020),
on_extra_columns="ignore",
)
# Update the original DataFrame with the new exporter data
bol_data.update(df_with_cnpj)
# For the remaining records, get the exporter data based on label
# Initialize the additional exporter columns
additional_exporter_columns = ["exporter_trase_id", "count_label"]
for col in additional_exporter_columns:
if col not in bol_data.columns:
bol_data[col] = None
# Take the rows where a match was not found against the database using the tax number
df_without_trase_id = bol_data[bol_data["count_trase_id"] != 1].copy()
df_without_trase_id[
[
"EXPORTER_LABEL",
"exporter_trader_id",
"exporter_group_name",
"exporter_group_id",
"exporter_trase_id",
"count_label",
]
] = find_traders_and_groups_by_label(
df_without_trase_id.rename(
columns={"EXPORTER_LABEL": "trader_label"}, errors="raise"
),
returning=[
"trader_name",
"trader_id",
"group_name",
"group_id",
"trase_id",
"count",
],
country_id=sql.Literal(country_id),
year=sql.Literal(2020),
on_extra_columns="ignore",
)
# Update the original DataFrame
bol_data.update(df_without_trase_id)
# The 'find_traders_and_groups' functions are sometimes returning the label id's instead
# of the trader or group id's. Update these values using 'get_label_trader_id' and 'get_trader_group_id'
# Create mappings for label_trader_id and trader_group_id
unique_trader_ids = bol_data["exporter_trader_id"].dropna().unique()
trader_id_map = {tid: get_label_trader_id(tid) for tid in unique_trader_ids}
group_id_map = {
tid: get_trader_group_id(trader_id_map[tid]) for tid in unique_trader_ids
}
# Replace exporter_trader_id and exporter_group_id using the mappings
bol_data["exporter_trader_id"] = bol_data["exporter_trader_id"].map(trader_id_map)
bol_data["exporter_group_id"] = bol_data["exporter_trader_id"].map(group_id_map)
# Make sure all the exporters have been found
bol_data["count"] = bol_data["count_trase_id"].fillna(0) + bol_data[
"count_label"
].fillna(0)
assert bol_data["count"].eq(1).all(), "Not all exporters have been found"
# Bring to 'exporter_trase_id' the values from 'trial_exporter_trase_id' where 'exporter_trase_id' is empty
bol_data["exporter_trase_id"] = bol_data["exporter_trase_id"].fillna(
bol_data["trial_exporter_trase_id"]
)
# Drop the no longer needed columns
bol_data.drop(
columns=["trial_exporter_trase_id", "count_trase_id", "count_label", "count"],
inplace=True,
)
# Clean exporter data columns with empty values
bol_data.loc[bol_data["EXPORTER_CNPJ"] == "0", "EXPORTER_CNPJ"] = "X"
# Cast again the int columns as they get casted to float during the update (python does this to allow NaNs if present)
bol_data["DAY"] = bol_data["DAY"].astype(int)
bol_data["MONTH"] = bol_data["MONTH"].astype(int)
bol_data["YEAR"] = bol_data["YEAR"].astype(int)
bol_data["exporter_group_id"] = bol_data["exporter_group_id"].astype(int)
bol_data["exporter_trader_id"] = bol_data["exporter_trader_id"].astype(int)
# Add the Country of Destination ISO code, based on the 2 letter from the COUNTRY_OF_DESTINATION_TRASE_ID
bol_data["COUNTRY_OF_DESTINATION_ISO"] = bol_data[
"COUNTRY_OF_DESTINATION_TRASE_ID"
].apply(
lambda x: pycountry.countries.get(alpha_2=x).alpha_3 if x is not None else None
)
return bol_data
from trase.data.brazil.coffee.trade.y2020.brazil_coffee_bol_2020 import (
get_exporter_data_from_database,
)
def model(dbt, cursor):
dbt.config(materialized="external")
# Load the clean bol data (doesn't include FOB)
bol_data = dbt.ref("bronze_brazil_bol_coffee_2020").df()
bol_data = get_exporter_data_from_database(bol_data)
return bol_data