Skip to content

DBT: Brazil Datamyne Cd 2017

File location: s3://trase-storage/brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.csv

DBT model name: brazil_datamyne_cd_2017

Explore on Metabase: Full table; summary statistics

DBT details


Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.2017_1
  • source.trase_duckdb.trase-storage-raw.2017_5
  • source.trase_duckdb.trase-storage-raw.2017_3
  • source.trase_duckdb.trase-storage-raw.2017_4
  • source.trase_duckdb.trase-storage-raw.2017_8
  • source.trase_duckdb.trase-storage-raw.2017_2
  • source.trase_duckdb.trase-storage-raw.2017_6
  • source.trase_duckdb.trase-storage-raw.2017_7

Sources

  • ['trase-storage-raw', '2017_1']
  • ['trase-storage-raw', '2017_5']
  • ['trase-storage-raw', '2017_3']
  • ['trase-storage-raw', '2017_4']
  • ['trase-storage-raw', '2017_8']
  • ['trase-storage-raw', '2017_2']
  • ['trase-storage-raw', '2017_6']
  • ['trase-storage-raw', '2017_7']
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf

from trase.tools import uses_database
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import concat, full_merge


def main():
    df = concat(
        [
            get_pandas_df(path, xlsx=True, skiprows=10, dtype=str, usecols="A:L")[:-9]
            for path in [
                "brazil/trade/cd/datamyne/2017/originals/2017_1.xls",
                "brazil/trade/cd/datamyne/2017/originals/2017_2.xls",
                "brazil/trade/cd/datamyne/2017/originals/2017_3.xlsx",
                "brazil/trade/cd/datamyne/2017/originals/2017_4.xlsx",
                "brazil/trade/cd/datamyne/2017/originals/2017_5.xlsx",
                "brazil/trade/cd/datamyne/2017/originals/2017_6.xlsx",
                "brazil/trade/cd/datamyne/2017/originals/2017_7.xls",
                "brazil/trade/cd/datamyne/2017/originals/2017_8.xlsx",
            ]
        ]
    )

    # alter column names
    df = df.rename(
        columns={
            "Country of Destination": "country_of_destination.label",
            "Date (Month)": "date",
            "Exporter CNJP": "exporter.cnpj",
            "Exporter Municipality": "exporter.municipality.label",
            "State / Department of the Exporter": "exporter.state.label",
            "Exporter Name": "exporter.label",
            "FOB Value (US$)": "fob",
            "Net Weight": "vol",
            "Port of Departure": "port_of_export.label",
            "Product HS": "hs8",
        },
        errors="raise",
    )

    df = add_derived_convenience_columns(df)
    df = clean_countries(df)
    df = clean_ports(df)
    df = clean_cnpjs(df)
    df = clean_municipalities(df)
    df = clean_states(df)

    write_csv_for_upload(
        df, "brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.csv"
    )


def add_derived_convenience_columns(df):
    assert all(df["hs8"].str.len() == 8)
    df = df.assign(hs4=df["hs8"].str.slice(0, 4))
    df = df.assign(hs6=df["hs8"].str.slice(0, 6))
    df = df.assign(year=df["date"].str.slice(0, 4).astype(int))
    df = df.assign(month=df["date"].str.slice(4, 6).astype(int))
    return df


@uses_database
def clean_countries(df, cnx=None):
    df_country_labels = pd.read_sql(
        """
        select distinct
            name as "country_of_destination.name",
            unnest(synonyms) as "country_of_destination.label",
            coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
        from views.regions where level = 1 and length(trase_id) = 2
        """,
        cnx.cnx,
    )
    df_country_labels = df_country_labels.append(
        {
            "country_of_destination.name": "UNKNOWN COUNTRY",
            "country_of_destination.trase_id": "XX",
            "country_of_destination.label": "EUROPEAN",
        },
        ignore_index=True,
        verify_integrity=True,
    )

    return full_merge(
        df,
        df_country_labels,
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
    )


@uses_database
def clean_ports(df, cnx=None):
    df_port_labels = pd.read_sql(
        """
        select distinct
            name as "port_of_export.name",
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )

    return full_merge(
        df,
        df_port_labels,
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
    )


def clean_cnpjs(df):
    df = df.copy()

    def validate(cnpj):
        if len(cnpj) == 14:
            return "cnpj", stdnum.br.cnpj.is_valid(cnpj)
        elif len(cnpj) == 11:
            return "cpf", stdnum.br.cpf.is_valid(cnpj)
        else:
            raise ValueError(cnpj)

    df["exporter.type"], is_valid = zip(*df["exporter.cnpj"].apply(validate))
    assert all(is_valid)
    return df


@uses_database
def clean_municipalities(df, cnx=None):
    # Given the CNPJs and their municipality labels, get a lookup of CNPJs to their
    # municipality trase id. Since we do not have the state information (we can't trust
    # the state column in the original data!) we are only able to do this for those
    # municipalities whose labels umabiguously identify a municipality.
    # first get a list of CNPJs with their municipality label
    df_lookup = df[["exporter.cnpj", "exporter.municipality.label"]].drop_duplicates()

    # add an index so that we can keep track of ambiguous municipality labels later
    df_lookup = df_lookup.assign(id=range(len(df_lookup)))

    # merge in municipalities
    df_municipalities = pd.read_sql(
        f"""
        select distinct
            name as "exporter.municipality.name",
            unnest(synonyms) as "exporter.municipality.label",
            trase_id as "exporter.municipality.trase_id"
        from views.regions
        where country = 'BRAZIL' and region_type = 'MUNICIPALITY'
        """,
        cnx.cnx,
    )

    df_lookup = pd.merge(
        df_lookup, df_municipalities, on=["exporter.municipality.label"]
    )

    # drop labels - now we only have name and trase id
    df_lookup = df_lookup.drop(columns="exporter.municipality.label", errors="raise")
    df_lookup = df_lookup.drop_duplicates()

    # ignore any municipalities for which the label was ambiguous
    df_lookup = df_lookup[~df_lookup.pop("id").duplicated()]

    # apply this lookup to the original data
    return full_merge(
        df,
        df_lookup,
        on="exporter.cnpj",
        validate="many_to_one",
        how="left",
    )


@uses_database
def clean_states(df, cnx=None):
    df_states = pd.read_sql(
        """
        select distinct 
            name as "exporter.state.name",
            unnest(synonyms) as "exporter.state.label", 
            trase_id as "exporter.state.trase_id"
        from views.regions where country = 'BRAZIL' and region_type = 'STATE'
        """,
        cnx.cnx,
    )

    return full_merge(
        df,
        df_states,
        on="exporter.state.label",
        validate="many_to_one",
        how="left",
    )


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "2017_1")
    dbt.source("trase-storage-raw", "2017_5")
    dbt.source("trase-storage-raw", "2017_3")
    dbt.source("trase-storage-raw", "2017_4")
    dbt.source("trase-storage-raw", "2017_8")
    dbt.source("trase-storage-raw", "2017_2")
    dbt.source("trase-storage-raw", "2017_6")
    dbt.source("trase-storage-raw", "2017_7")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})