Skip to content

Silver Brazil Bol Coffee 2020

s3://trase-storage/brazil/coffee/trade/2020/silver/silver_brazil_bol_coffee_2020.parquet

Dbt path: trase_production.main_brazil_coffee.silver_brazil_bol_coffee_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/coffee/trade/_schema_brazil_coffee.yml

Model file link: trase/data_pipeline/models/brazil/coffee/trade/silver_brazil_bol_coffee_2020.py

Calls script: trase/data/brazil/coffee/trade/y2020/brazil_coffee_bol_2020.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: silver, brazil, coffee, trade, comtrade, 2020, diet-trase-coffee


silver_brazil_bol_coffee_2020

Description

This script loads the original BOL data for Brazil in 2020, filters it for coffee-related HS codes, and then finds the exporter information in the database based on the exporter tax number and exporter label. The country names from the original BOL data where already cleaned so there was no need to process them.

Calls the Python script: trase/data/brazil/coffee/trade/y2020/brazil_coffee_bol_2020.py


Details

Column Type Description
HS6 VARCHAR
COUNTRY_OF_ORIGIN_LABEL VARCHAR
EXPORTER_CNPJ VARCHAR
EXPORTER_LABEL VARCHAR
PORT_OF_EXPORT_LABEL VARCHAR
PORT_OF_IMPORT_LABEL VARCHAR
COUNTRY_OF_DESTINATION_LABEL VARCHAR
IMPORTER_CITY VARCHAR
IMPORTER_LABEL VARCHAR
IMPORTER_COUNTRY_LABEL VARCHAR
IMPORTER_CODE VARCHAR
VOL DOUBLE
HS4 VARCHAR
HS5 VARCHAR
YEAR BIGINT
MONTH BIGINT
DAY BIGINT
EXPORTER_STATE_NAME VARCHAR
EXPORTER_STATE_TRASE_ID VARCHAR
EXPORTER_MUNICIPALITY_NAME VARCHAR
EXPORTER_MUNICIPALITY_TRASE_ID VARCHAR
COUNTRY_OF_DESTINATION_NAME VARCHAR
COUNTRY_OF_DESTINATION_TRASE_ID VARCHAR
PORT_OF_EXPORT_NAME VARCHAR
EXPORTER_TYPE VARCHAR
exporter_group_name VARCHAR
exporter_group_id BIGINT
exporter_trader_id BIGINT
exporter_trase_id VARCHAR
COUNTRY_OF_DESTINATION_ISO VARCHAR

Models / Seeds

  • model.trase_duckdb.bronze_brazil_bol_coffee_2020
"""
This script loads the bronze BOL coffee data for Brazil in 2020,
finds the exporter information in the database based on the exporter tax number and exporter label. The country
names from the original BOL data where already cleaned so there was no need to process them.

The script is structured so it can be called directly or through dbt.
"""

from psycopg2 import sql
import pycountry

from trase.tools import get_country_id
from trase.tools import get_label_trader_id
from trase.tools import get_trader_group_id
from trase.tools.pandasdb.find import find_traders_and_groups_by_label
from trase.tools.pandasdb.find import find_traders_and_groups_by_trase_id


def get_exporter_data_from_database(bol_data):
    """Gets exporter existing information from the database based on EXPORTER_LABEL and EXPORTER_CNPJ"""
    # Find the trader information - first the ones with tax number
    country_id = get_country_id("BRAZIL")

    # Concatenate 'BR-' to the first 8 characters of the exporter tax number
    bol_data["trial_exporter_trase_id"] = bol_data["EXPORTER_CNPJ"].apply(
        lambda x: "BR-TRADER-" + x[:8] if x is not None and len(x) >= 8 else None
    )

    # Get exporter related data from the database, based on trase_id

    # Initialize the columns where data related to exporters will be
    exporter_columns = [
        "exporter_group_name",
        "exporter_group_id",
        "exporter_trader_id",
        "count_trase_id",
    ]
    for col in exporter_columns:
        if col not in bol_data.columns:
            bol_data[col] = None

    # Search for the exporter information of records with a CNPJ
    df_with_cnpj = bol_data[bol_data["trial_exporter_trase_id"].notnull()].copy()
    df_with_cnpj[
        [
            "exporter_group_name",
            "exporter_group_id",
            "exporter_trader_id",
            "count_trase_id",
        ]
    ] = find_traders_and_groups_by_trase_id(
        df_with_cnpj.rename(
            columns={"trial_exporter_trase_id": "trase_id"}, errors="raise"
        ),
        returning=["group_name", "group_id", "trader_id", "count"],
        year=sql.Literal(2020),
        on_extra_columns="ignore",
    )

    # Update the original DataFrame with the new exporter data
    bol_data.update(df_with_cnpj)

    # For the remaining records, get the exporter data based on label

    # Initialize the additional exporter columns
    additional_exporter_columns = ["exporter_trase_id", "count_label"]
    for col in additional_exporter_columns:
        if col not in bol_data.columns:
            bol_data[col] = None

    # Take the rows where a match was not found against the database using the tax number
    df_without_trase_id = bol_data[bol_data["count_trase_id"] != 1].copy()
    df_without_trase_id[
        [
            "EXPORTER_LABEL",
            "exporter_trader_id",
            "exporter_group_name",
            "exporter_group_id",
            "exporter_trase_id",
            "count_label",
        ]
    ] = find_traders_and_groups_by_label(
        df_without_trase_id.rename(
            columns={"EXPORTER_LABEL": "trader_label"}, errors="raise"
        ),
        returning=[
            "trader_name",
            "trader_id",
            "group_name",
            "group_id",
            "trase_id",
            "count",
        ],
        country_id=sql.Literal(country_id),
        year=sql.Literal(2020),
        on_extra_columns="ignore",
    )

    # Update the original DataFrame
    bol_data.update(df_without_trase_id)

    # The 'find_traders_and_groups' functions are sometimes returning the label id's instead
    # of the trader or group id's. Update these values using 'get_label_trader_id' and 'get_trader_group_id'

    # Create mappings for label_trader_id and trader_group_id
    unique_trader_ids = bol_data["exporter_trader_id"].dropna().unique()
    trader_id_map = {tid: get_label_trader_id(tid) for tid in unique_trader_ids}
    group_id_map = {
        tid: get_trader_group_id(trader_id_map[tid]) for tid in unique_trader_ids
    }

    # Replace exporter_trader_id and exporter_group_id using the mappings
    bol_data["exporter_trader_id"] = bol_data["exporter_trader_id"].map(trader_id_map)
    bol_data["exporter_group_id"] = bol_data["exporter_trader_id"].map(group_id_map)

    # Make sure all the exporters have been found
    bol_data["count"] = bol_data["count_trase_id"].fillna(0) + bol_data[
        "count_label"
    ].fillna(0)

    assert bol_data["count"].eq(1).all(), "Not all exporters have been found"

    # Bring to 'exporter_trase_id' the values from 'trial_exporter_trase_id' where 'exporter_trase_id' is empty
    bol_data["exporter_trase_id"] = bol_data["exporter_trase_id"].fillna(
        bol_data["trial_exporter_trase_id"]
    )

    # Drop the no longer needed columns
    bol_data.drop(
        columns=["trial_exporter_trase_id", "count_trase_id", "count_label", "count"],
        inplace=True,
    )

    # Clean exporter data columns with empty values
    bol_data.loc[bol_data["EXPORTER_CNPJ"] == "0", "EXPORTER_CNPJ"] = "X"

    # Cast again the int columns as they get casted to float during the update (python does this to allow NaNs if present)
    bol_data["DAY"] = bol_data["DAY"].astype(int)
    bol_data["MONTH"] = bol_data["MONTH"].astype(int)
    bol_data["YEAR"] = bol_data["YEAR"].astype(int)
    bol_data["exporter_group_id"] = bol_data["exporter_group_id"].astype(int)
    bol_data["exporter_trader_id"] = bol_data["exporter_trader_id"].astype(int)

    # Add the Country of Destination ISO code, based on the 2 letter from the COUNTRY_OF_DESTINATION_TRASE_ID
    bol_data["COUNTRY_OF_DESTINATION_ISO"] = bol_data[
        "COUNTRY_OF_DESTINATION_TRASE_ID"
    ].apply(
        lambda x: pycountry.countries.get(alpha_2=x).alpha_3 if x is not None else None
    )

    return bol_data
from trase.data.brazil.coffee.trade.y2020.brazil_coffee_bol_2020 import (
    get_exporter_data_from_database,
)


def model(dbt, cursor):
    dbt.config(materialized="external")

    # Load the clean bol data (doesn't include FOB)
    bol_data = dbt.ref("bronze_brazil_bol_coffee_2020").df()

    bol_data = get_exporter_data_from_database(bol_data)

    return bol_data