Skip to content

DBT: Brazil Bol 2019

File location: s3://trase-storage/brazil/trade/bol/2019/BRAZIL_BOL_2019.csv

DBT model name: brazil_bol_2019

Explore on Metabase: Full table; summary statistics

DBT details


Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/2019/BRAZIL_BOL_2019.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description
date BIGINT
hs4 VARCHAR
hs6 VARCHAR
hs8 VARCHAR
country_of_origin.label VARCHAR
exporter.cnpj VARCHAR
exporter.label VARCHAR
port_of_export.label VARCHAR
port_of_import.label VARCHAR
country_of_destination.label VARCHAR
importer.city VARCHAR
importer.label VARCHAR
importer.country.label VARCHAR
vessel.label VARCHAR
vessel.id BIGINT
vol BIGINT
hs5 VARCHAR
year BIGINT
month BIGINT
day BIGINT
country_of_destination.name VARCHAR
country_of_destination.trase_id VARCHAR
port_of_export.name VARCHAR
exporter.type VARCHAR
exporter.trase_id VARCHAR
exporter.trader_id BIGINT
exporter.group VARCHAR
exporter.name VARCHAR

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.lc_originals_brazil_bol_2019
  • model.trase_duckdb.hs2017

Sources

  • ['trase-storage-raw', 'lc_originals_brazil_bol_2019']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql

from trase.tools import (
    find_label,
    get_country_id,
    get_label_trader_id,
    get_node_name,
    get_trader_group_id,
)
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pandasdb.find import (
    find_default_name_by_node_id,
    find_traders_and_groups_by_label,
    find_traders_and_groups_by_trase_id,
)
from trase.tools.pcs.connect import uses_database

KNOWN_CPFS = ["00331627949", "00523682891", "00297458108", "00216760895"]
YEAR = 2019


def select_and_rename_columns(df):
    columns = {
        "Period/YYYYMMDD": "date",
        "Commodity_HS_Datamar/HS4 Code": "hs4",
        "Commodity_HS_Datamar/HS6 Code": "hs6",
        "Commodity_HS/HS8 Code": "hs8",
        "Place_and_Ports/POL_Country": "country_of_origin.label",
        # exporter
        "Company_Shipper/Registration Number": "exporter.cnpj",
        "Company_Shipper/Shipper Name": "exporter.label",
        # ports, country
        "Place_and_Ports/POL_Name": "port_of_export.label",
        "Place_and_Ports/POD_Name": "port_of_import.label",
        "Place_and_Ports/DEST_Country": "country_of_destination.label",
        # importer
        "Company_Consignee/City": "importer.city",
        "Company_Consignee/Consignee Name": "importer.label",
        "Company_Consignee/Country": "importer.country.label",
        # vessel
        "Vessel/Vessel Name": "vessel.label",
        "Vessel/IMO": "vessel.id",
        # volume
        "WTKG": "vol",
    }
    return df[columns].rename(columns=columns, errors="raise")


def clean_hs_codes(df):
    # download an authoritative list of HS codes
    df_hscodes = get_pandas_df_once(
        "world/metadata/codes/hs/HS2017.csv", sep=";", dtype=str, keep_default_na=False
    )

    # fix some specific codes
    df = df.replace(
        {
            "hs6": {
                # HS6 codes that are actually HS5
                "010220": "01022X",
                "160230": "16023X",
                "160240": "16024X",
                "090110": "09011X",
                "020710": "02071X",
                "020320": "02032X",
                "110810": "11081X",
                # HS6 codes that are actually HS4
                "010200": "0102XX",
                "020200": "0202XX",
                "020300": "0203XX",
                "020600": "0206XX",
                "120100": "1201XX",
                "020700": "0207XX",
                "090100": "0901XX",
                "100500": "1005XX",
                "110300": "1103XX",
                "110400": "1104XX",
                "110800": "1108XX",
                "120700": "1207XX",
                "140400": "1404XX",
                "150700": "1507XX",
                "151100": "1511XX",
                "151200": "1512XX",
                "160200": "1602XX",
                "180300": "1803XX",
                "230600": "2306XX",
                "240100": "2401XX",
                "260100": "2601XX",
            },
        }
    )

    # the HS8 code "00330000" seems to be unknown
    df = df.assign(hs8=df["hs8"].mask(df["hs8"] == "00330000", df["hs6"] + "XX"))

    # validate that codes are hierarchical and the right length
    assert all(df["hs8"].str.len() == 8)
    df["hs5"] = df["hs6"].str.slice(0, 5)
    assert all(df["hs4"] == df["hs8"].str.slice(0, 4))
    assert all(df["hs4"] == df["hs6"].str.slice(0, 4))

    # validate that all HS4 codes exist
    df_hs4 = df_hscodes[df_hscodes["type"] == "hs4"]["code"].rename("hs4")
    d = pd.merge(df, df_hs4, how="left", validate="many_to_one", indicator=True)
    assert all(d["_merge"] == "both")

    # validate that all HS5 codes exist
    df_hs5 = (
        df_hscodes[df_hscodes["type"] == "hs6"]["code"]
        .str.slice(0, 5)
        .rename("hs5")
        .drop_duplicates()
    )
    d = pd.merge(df, df_hs5, how="left", validate="many_to_one", indicator=True)
    assert all(d["hs5"].str.endswith("X") | (d["_merge"] == "both"))

    # validate that all HS6 codes exist
    df_hs6 = df_hscodes[df_hscodes["type"] == "hs6"]["code"].rename("hs6")
    d = pd.merge(df, df_hs6, how="left", validate="many_to_one", indicator=True)
    assert all(d["hs6"].str.endswith("X") | (d["_merge"] == "both"))

    return df


def assert_full_merge(df, column):
    missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
    assert missing.empty, f"Not all {column} found:\n{missing}"


def clean_cnpjs(df):
    # manually add missing CPF for GUILHERME AUGUSTIN
    a = df["exporter.label"] == "GUILHERME AUGUSTIN"
    b = df["exporter.cnpj"].astype(int) == 0
    df = df.copy()
    df.loc[a & b, "exporter.cnpj"] = "38853329149"

    # validate all CNPJs & CPFs
    cnpj = df["exporter.cnpj"].str.rjust(14, "0")
    cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)

    cpf = df["exporter.cnpj"].str.rjust(11, "0")
    cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)
    cnpj_valid[cpf.isin(KNOWN_CPFS)] = False

    assert not any(cnpj_valid & cpf_valid)

    df = df.copy()
    df["exporter.type"] = "unknown"
    df.loc[cnpj_valid, "exporter.type"] = "cnpj"
    df.loc[cpf_valid, "exporter.type"] = "cpf"

    df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
    df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])

    return df


@uses_database
def get_port_labels(cnx=None):
    return pd.read_sql(
        """
        select distinct 
            name as "port_of_export.name",
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )


def clean_ports(df):
    df = pd.merge(
        df,
        get_port_labels(),
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_full_merge(df, "port_of_export.label")
    return df


@uses_database
def get_country_labels(cnx=None):
    return pd.read_sql(
        """
        select distinct 
            name as "country_of_destination.name",
            unnest(synonyms) as "country_of_destination.label",
            coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
        from views.regions where level = 1 and length(trase_id) = 2
        """,
        cnx.cnx,
    )


def clean_countries(df):
    df = pd.merge(
        df,
        get_country_labels(),
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_full_merge(df, "country_of_destination.label")
    return df


@uses_database
def clean_exporters_and_add_group(df, cur=None, cnx=None):
    """
    This function adds two columns:

        exporter.name - the default name of the exporter from the database
        exporter.group - the group name from the database

    It does this using the following algorithm:

     1. Construct a Trase ID from exporter.cnpj and use this to perform a lookup in the
        database
     2. If a unique name + group cannot be found through that method, use exporter.label
        to perform a lookup among trader labels in the database

    TODO: try to do this more concisely / in fewer lines of code
    """
    trase_ids = "BR-TRADER-" + df["exporter.cnpj"].str.slice(0, 8)
    trase_ids = trase_ids.replace({"BR-TRADER-00000000": None})
    df = df.assign(**{"exporter.trase_id": trase_ids})
    df_exporters = df[["exporter.label", "exporter.trase_id"]].drop_duplicates()

    # clean exporter names using trase id
    df_exporters[["exporter.trader_id", "exporter.group", "count"]] = (
        find_traders_and_groups_by_trase_id(
            df_exporters.rename(columns={"exporter.trase_id": "trase_id"})[
                ["trase_id"]
            ],
            returning=["trader_id", "group_name", "count"],
            year=sql.Literal(YEAR),
            cur=cur,
            cnx=cnx,
        )
    )
    counts = df_exporters.pop("count")
    assert all(counts.isin([0, 1]))
    not_found_by_trase_id = counts == 0
    print(
        f"{sum(~not_found_by_trase_id)} exporters were found by Trase ID and "
        f"{sum(not_found_by_trase_id)} were not"
    )
    df_found_by_trase_id = df_exporters[~not_found_by_trase_id]
    df_missing = df_exporters[not_found_by_trase_id].copy()

    # if not found by Trase ID, then look by name
    labels = df_missing["exporter.label"].drop_duplicates()
    df_labels = pd.DataFrame(labels)
    df_labels[["exporter.trader_id", "exporter.group", "count"]] = (
        find_traders_and_groups_by_label(
            df_labels.rename(columns={"exporter.label": "trader_label"}),
            returning=["trader_id", "group_name", "count"],
            year=sql.Literal(YEAR),
        )
    )

    # special case for UNKNOWN CUSTOMER
    is_unknown = (df_labels["count"] != 1) & (
        df_labels["exporter.label"] == "UNKNOWN CUSTOMER"
    )
    if any(is_unknown):
        brazil_id = get_country_id("BRAZIL", cur=cur)
        label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
        trader_id = get_label_trader_id(label_id, brazil_id)
        group_id = get_trader_group_id(trader_id, cur=cur)
        group_name = get_node_name(group_id, cur=cur)
        df_labels.loc[is_unknown, "exporter.trader_id"] = trader_id
        df_labels.loc[is_unknown, "exporter.group"] = group_name
        df_labels.loc[is_unknown, "count"] = 1

    # we should have found one unique node for every exporter
    bad = df_labels["count"] != 1
    if any(bad):
        raise ValueError(f"Missing some exporters:\n{df_labels[bad]}")

    # merge exporters found by trase id back into results
    right = df_found_by_trase_id[
        ["exporter.trase_id", "exporter.trader_id", "exporter.group"]
    ].drop_duplicates()
    df1 = pd.merge(
        df,
        right,
        on=["exporter.trase_id"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df1.pop("_merge")
    df_solved1 = df1[merge == "both"]

    # merge exporters found by label back into results
    df_unsolved = df1[merge != "both"]
    df_unsolved = df_unsolved.drop(
        columns=["exporter.trader_id", "exporter.group"], errors="raise"
    )

    right = df_labels[
        ["exporter.label", "exporter.trader_id", "exporter.group"]
    ].drop_duplicates()
    df_solved2 = pd.merge(
        df_unsolved,
        right,
        on=["exporter.label"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df_solved2.pop("_merge")
    assert all(merge == "both")

    # combine the two
    expected_columns = list(set(df.columns) | {"exporter.trader_id", "exporter.group"})
    assert sorted(df_solved2.columns) == sorted(expected_columns)
    assert sorted(df_solved1.columns) == sorted(expected_columns)
    df_final = pd.concat([df_solved1, df_solved2]).reset_index(drop=True)

    # guarantee that we didn't change the original data
    a = df.sort_values(list(df.columns)).reset_index(drop=True)
    b = df_final[df.columns].sort_values(list(df.columns)).reset_index(drop=True)
    b.columns.name = a.columns.name  # needed for assert equal but don't know what it is
    pd.testing.assert_frame_equal(a, b)

    # add exporter names
    df_final = df_final.astype({"exporter.trader_id": int})
    df_final[["exporter.name"]] = find_default_name_by_node_id(
        df_final[["exporter.trader_id"]].rename(
            columns={"exporter.trader_id": "node_id"}
        ),
        returning=["name"],
        cnx=cnx,
        cur=cur,
    )

    return df_final


def main():
    df = get_pandas_df_once(
        "brazil/trade/bol/2019/originals/Brazil_Bol_2019.csv",
        sep=";",
        dtype=str,
        keep_default_na=False,
    )

    df = select_and_rename_columns(df)
    df = clean_hs_codes(df)

    df = df.assign(
        year=df["date"].str.slice(0, 4).astype(int),
        month=df["date"].str.slice(4, 6).astype(int),
        day=df["date"].str.slice(6, 8).astype(int),
    )

    df = clean_countries(df)
    df = clean_ports(df)
    df = clean_cnpjs(df)
    df = clean_exporters_and_add_group(df)

    write_csv_for_upload(df, "brazil/trade/bol/2019/BRAZIL_BOL_2019.csv")


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.ref("hs2017")
    dbt.source("trase-storage-raw", "lc_originals_brazil_bol_2019")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})