Skip to content

DBT: Brazil Bol 2015

File location: s3://trase-storage/brazil/trade/bol/2015/BRAZIL_BOL_2015.csv

DBT model name: brazil_bol_2015

Explore on Metabase: Full table; summary statistics

DBT details


Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/2015/BRAZIL_BOL_2015.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.piers_2015

Sources

  • ['trase-storage-raw', 'piers_2015']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf

from trase.tools import uses_database
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import full_merge

COLUMNS = {
    "HS8_Code": "hs8",
    "POL_Name": "port_of_export.label",
    "YYYYMM": "date",
    "DEST_Country": "country_of_destination.label",
    "Shipper": "exporter.label",
    "Shipper_Type": "shipper_type",
    "Shipper_City": "exporter.municipality.label",
    "Shipper_State": "exporter.state.label",
    "Shipper_Zip": "exporter.zip",
    "Shipper_Country_Name": "exporter.country.label",
    "Shipper_Registration_Number": "exporter.cnpj",
    "WTMT": "vol",  # is this right?
}


def clean_hs_codes(df):
    hs8 = df["hs8"].astype(str).str.rjust(8, "0")
    assert all(hs8.str.len() == 8)
    return df.assign(hs8=hs8, hs6=hs8.str.slice(0, 6), hs4=hs8.str.slice(0, 4))


def clean_cnpjs(df):
    df = df.copy()
    df["exporter.cnpj"] = df["exporter.cnpj"].astype(str)

    cnpj = df["exporter.cnpj"].str.rjust(14, "0")
    cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)

    cpf = df["exporter.cnpj"].str.rjust(11, "0")
    cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)

    assert not any(cnpj_valid & cpf_valid)

    df["exporter.type"] = "unknown"
    df.loc[cnpj_valid, "exporter.type"] = "cnpj"
    df.loc[cpf_valid, "exporter.type"] = "cpf"

    df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
    df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])

    return df


@uses_database
def get_port_labels(cnx=None):
    return pd.read_sql(
        """
        select distinct 
            name as "port_of_export.name",
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )


def clean_ports(df):
    return full_merge(
        df,
        get_port_labels(),
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
    )


@uses_database
def get_country_labels(cnx=None):
    return pd.read_sql(
        """
        select distinct 
            name as "country_of_destination.name",
            unnest(synonyms) as "country_of_destination.label",
            coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
        from views.regions where level = 1 and length(trase_id) = 2
        """,
        cnx.cnx,
    )


def clean_countries(df):
    return full_merge(
        df,
        get_country_labels(),
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
    )


def main():
    df = get_pandas_df_once(
        "brazil/trade/bol/2015/originals/PIERS_2015.xlsx",
        xlsx=True,
        keep_default_na=False,
    )

    df = df[COLUMNS].rename(columns=COLUMNS, errors="raise")

    date = df.pop("date").astype(str)
    df["year"] = date.str.slice(0, 4).astype(int)
    df["month"] = date.str.slice(4, 6).astype(int)

    df = clean_hs_codes(df)
    df = clean_countries(df)
    df = clean_ports(df)
    df = clean_cnpjs(df)

    write_csv_for_upload(df, "brazil/trade/bol/2015/BRAZIL_BOL_2015.csv")


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "piers_2015")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})