Skip to content

DBT: Brazil Bol Dataliner Soy 2019

File location: s3://trase-storage/brazil/trade/bol/2019/BRAZIL_BOL_DATALINER_SOY_2019.csv

DBT model name: brazil_bol_dataliner_soy_2019

Explore on Metabase: Full table; summary statistics

DBT details


Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/201X/BRAZIL_BOL_DATALINER_SOY_201X.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.dataliner_soy_2017_2019_cleaned

Sources

  • ['trase-storage-raw', 'dataliner_soy_2017_2019_cleaned']
"""
Creates the following files:

    s3://trase-storage/brazil/trade/bol/2017/BRAZIL_BOL_DATALINER_SOY_2017.csv
    s3://trase-storage/brazil/trade/bol/2018/BRAZIL_BOL_DATALINER_SOY_2018.csv
    s3://trase-storage/brazil/trade/bol/2019/BRAZIL_BOL_DATALINER_SOY_2019.csv
"""

import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf

from trase.tools import uses_database
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import full_merge

COLUMNS = {
    "Period/YYYYMMDD": "date",
    "Commodity_HS_Datamar/HS6 Code": "hs6",
    "Commodity_HS_Datamar/HS8 Code": "hs8",
    "Company_Consignee/Consignee Name": "importer",
    "Company_Shipper/Shipper Name": "exporter.label",
    "Company_Shipper/Registration Number": "exporter.cnpj",
    "Company_Shipper/Country Name": "exporter.country.label",
    "Company_Shipper/State": "exporter.state.label",
    "Company_Shipper/City": "exporter.city",
    "Company_Shipper/Street": "exporter.street",
    "Company_Shipper/Zip": "exporter.zip_code",
    "Place_and_Ports/DEST_Country": "country_of_destination.label",
    "Place_and_Ports/DEST_Name": "port_of_import",
    "Place_and_Ports/POD_Country": "country_of_discharge.label",
    "Place_and_Ports/POD_Name": "port_of_discharge.label",
    "Place_and_Ports/POL_Country": "country_of_production",
    "Place_and_Ports/POL_Name": "port_of_export.label",
    "WTKG": "vol",
}
UNKNOWN_COUNTRY_LABELS = ["EUROPEAN", "EUROPE MED"]
UNKNOWN_CNPJ = "X" * 14
CNPJ_FIXES = {
    "5492968000449": "05492968000449",
    "0": UNKNOWN_CNPJ,
    "": UNKNOWN_CNPJ,
}


def validate_hs_codes_and_add_hs4(df):
    assert all(df["hs6"] == df["hs8"].str.slice(0, 6))
    assert all(df["hs8"].str.len() == 8)
    return df.assign(hs4=df["hs6"].str.slice(0, 4))


@uses_database
def clean_countries(df, column, cnx=None):
    df_country_labels = pd.read_sql(
        f"""
        select distinct
            name as "{column}.name",
            unnest(synonyms) as "{column}.label",
            coalesce(trase_id, 'XX') AS "{column}.trase_id"
        from views.regions where level = 1 and length(trase_id) = 2
        """,
        cnx.cnx,
    )
    df_country_labels = df_country_labels.append(
        [
            {
                f"{column}.name": "UNKNOWN COUNTRY",
                f"{column}.trase_id": "XX",
                f"{column}.label": label,
            }
            for label in UNKNOWN_COUNTRY_LABELS
        ],
        ignore_index=True,
        verify_integrity=True,
    )
    return full_merge(
        df,
        df_country_labels,
        on=f"{column}.label",
        validate="many_to_one",
        how="left",
    )


@uses_database
def clean_ports(df, cnx=None):
    df_port_labels = pd.read_sql(
        """
        select distinct
            name as "port_of_export.name",
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )

    return full_merge(
        df,
        df_port_labels,
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
    )


def clean_cnpjs(df):
    df = df.copy()

    df["exporter.cnpj"] = df["exporter.cnpj"].apply(lambda x: CNPJ_FIXES.get(x, x))

    def validate(cnpj):
        if cnpj == UNKNOWN_CNPJ:
            return "unknown"
        elif len(cnpj) == 14 and stdnum.br.cnpj.is_valid(cnpj):
            return "cnpj"
        elif len(cnpj) == 11 and stdnum.br.cpf.is_valid(cnpj):
            return "cpf"
        else:
            return "invalid"

    df["exporter.type"] = df["exporter.cnpj"].apply(validate)
    return df


def main():
    df = get_pandas_df(
        "brazil/trade/bol/2017/originals/DATALINER_SOY_2017_2019_cleaned.csv",
        sep=";",
        encoding="utf8",
        keep_default_na=False,
        dtype=str,
    )

    df = df[COLUMNS].rename(columns=COLUMNS, errors="raise")

    date = df.pop("date")
    df["year"] = date.str.slice(0, 4).astype(int)
    df["month"] = date.str.slice(4, 6).astype(int)
    df["day"] = date.str.slice(6, 8).astype(int)

    assert all(df["country_of_production"] == "BRAZIL")

    df = validate_hs_codes_and_add_hs4(df)
    df = clean_countries(df, "country_of_destination")
    df = clean_countries(df, "country_of_discharge")
    df = clean_ports(df)
    df = clean_cnpjs(df)

    print("Now run:")
    for year in df["year"].unique():
        write_csv_for_upload(
            df[df["year"] == year],
            f"brazil/trade/bol/{year}/BRAZIL_BOL_DATALINER_SOY_{year}.csv",
        )


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "dataliner_soy_2017_2019_cleaned")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})