Skip to content

Cd Combined 2015

s3://trase-storage/brazil/trade/cd/combined/CD_COMBINED_2015.csv

Dbt path: trase_production.main_brazil.cd_combined_2015

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/trade/cd/combined/_schema.yml

Model file link: trase/data_pipeline/models/brazil/trade/cd/combined/cd_combined_2015.py

Calls script: trase/data/brazil/trade/cd/combined/CD_COMBINED_201X.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, brazil, cd, combined, trade


cd_combined_2015

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/cd/combined/CD_COMBINED_201X.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.cd_combined_chicken_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_copper_ore_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_cocoa_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_tobacco_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_palm_oil_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_iron_ore_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_soy_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_corn_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_niobium_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_pork_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_cotton_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_coffee_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_beef_2015
  • source.trase_duckdb.trase-storage-raw.cd_combined_aluminium_2015

Sources

  • ['trase-storage-raw', 'cd_combined_chicken_2015']
  • ['trase-storage-raw', 'cd_combined_copper_ore_2015']
  • ['trase-storage-raw', 'cd_combined_cocoa_2015']
  • ['trase-storage-raw', 'cd_combined_tobacco_2015']
  • ['trase-storage-raw', 'cd_combined_palm_oil_2015']
  • ['trase-storage-raw', 'cd_combined_iron_ore_2015']
  • ['trase-storage-raw', 'cd_combined_soy_2015']
  • ['trase-storage-raw', 'cd_combined_corn_2015']
  • ['trase-storage-raw', 'cd_combined_niobium_2015']
  • ['trase-storage-raw', 'cd_combined_pork_2015']
  • ['trase-storage-raw', 'cd_combined_cotton_2015']
  • ['trase-storage-raw', 'cd_combined_coffee_2015']
  • ['trase-storage-raw', 'cd_combined_beef_2015']
  • ['trase-storage-raw', 'cd_combined_aluminium_2015']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf

from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.pcs.connect import uses_database

SOURCES = [
    ("ALUMINIUM", "brazil/aluminium/trade/cd/combined/CD_COMBINED_ALUMINIUM_2015.csv"),
    ("BEEF", "brazil/beef/trade/cd/combined/CD_COMBINED_BEEF_2015.csv"),
    ("CHICKEN", "brazil/chicken/trade/cd/combined/CD_COMBINED_CHICKEN_2015.csv"),
    ("COCOA", "brazil/cocoa/trade/cd/combined/CD_COMBINED_COCOA_2015.csv"),
    ("COFFEE", "brazil/coffee/trade/cd/combined/CD_COMBINED_COFFEE_2015.csv"),
    ("COPPER_ORE", "brazil/copper/trade/cd/combined/CD_COMBINED_COPPER_ORE_2015.csv"),
    ("CORN", "brazil/corn/trade/cd/combined/CD_COMBINED_CORN_2015.csv"),
    ("COTTON", "brazil/cotton/trade/cd/combined/CD_COMBINED_COTTON_2015.csv"),
    ("IRON_ORE", "brazil/iron_ore/trade/cd/combined/CD_COMBINED_IRON_ORE_2015.csv"),
    ("NIOBIUM", "brazil/niobium/trade/cd/combined/CD_COMBINED_NIOBIUM_2015.csv"),
    ("PALM_OIL", "brazil/palm_oil/trade/cd/combined/CD_COMBINED_PALM_OIL_2015.csv"),
    ("PORK", "brazil/pork/trade/cd/combined/CD_COMBINED_PORK_2015.csv"),
    ("SOY", "brazil/soy/trade/cd/combined/CD_COMBINED_SOY_2015.csv"),
    ("TOBACCO", "brazil/tobacco/trade/cd/combined/CD_COMBINED_TOBACCO_2015.csv"),
]


def main():
    S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
    df = pd.concat(
        get_pandas_df(key, sep=";", dtype=str, keep_default_na=False).assign(
            commodity=commodity
        )
        for commodity, key in SOURCES
    )
    df = df.rename(
        columns={
            "country": "country_of_destination.label",
            "state": "state.label",
            "port": "port_of_export.label",
            "geocode": "municipality.geocode",
            "exporter": "exporter.label",
            "cnpj": "exporter.cnpj",
        },
        errors="raise",
    )
    df["municipality.trase_id"] = "BR-" + df["municipality.geocode"]

    df = clean_countries(df)
    df = clean_ports(df)
    df = clean_states(df)
    df = clean_cnpjs(df)
    df = add_derived_columns(df)
    write_csv_for_upload(df, f"brazil/trade/cd/combined/CD_COMBINED_{year}.csv")


def clean_cnpjs(df):
    df = df.copy()

    cnpj = df["exporter.cnpj"].str.rjust(14, "0")
    valid_cnpj = cnpj.apply(stdnum.br.cnpj.is_valid)

    cpf = df["exporter.cnpj"].str.rjust(11, "0")
    valid_cpf = cpf.apply(stdnum.br.cpf.is_valid)

    # manually override some cases
    valid_cnpj[cnpj == "00000523682891"] = False
    valid_cnpj[cnpj == "00004925661000"] = False
    valid_cnpj[cnpj == "00000408796901"] = False
    valid_cnpj[cnpj == "00000297458108"] = False

    # every tax code should be a CNPJ or a CPF, but not both
    assert not any(valid_cpf & valid_cnpj)
    assert all(valid_cpf | valid_cnpj)

    df["exporter.cnpj"] = np.where(valid_cnpj, cnpj, cpf)
    df["exporter.type"] = np.where(valid_cnpj, "cnpj", "cpf")

    return df


def assert_none_missing(df, column):
    missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
    assert missing.empty, f"Not all {column} found:\n{missing}"


def clean_ports(df):
    """We clean POL (Port of Lading) which corresponds to Trase's port of export"""
    df = pd.merge(
        df,
        get_port_labels(),
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "port_of_export.label")
    return df


def clean_countries(df):
    df = pd.merge(
        df,
        get_country_labels(),
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "country_of_destination.label")
    return df


def clean_states(df) -> pd.DataFrame:
    df = pd.merge(
        df,
        get_states(),
        on="state.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "state.label")
    return df


@uses_database
def get_country_labels(cnx=None):
    df = pd.read_sql(
        """
        select distinct 
            name as "country_of_destination.name",
            unnest(synonyms) as "country_of_destination.label",
            coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
        from views.regions where level = 1 and length(trase_id) >= 2
        """,
        cnx.cnx,
    )
    df = df.append(
        {
            "country_of_destination.name": "UNKNOWN COUNTRY",
            "country_of_destination.trase_id": "XX",
            "country_of_destination.label": "EUROPEAN",
        },
        ignore_index=True,
        verify_integrity=True,
    )
    return df


@uses_database
def get_port_labels(cnx=None):
    return pd.read_sql(
        """
        select distinct 
            name as "port_of_export.name",
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )


@uses_database
def get_states(cnx=None) -> pd.DataFrame:
    return pd.read_sql(
        """
        select distinct 
               name as "state.name",
               unnest(synonyms) as "state.label",
               trase_id AS "state.trase_id"
          from views.regions 
         where country = 'BRAZIL' 
           and region_type = 'STATE' 
           and trase_id is not null
        """,
        cnx.cnx,
    )


def add_derived_columns(df):
    df = df.assign(ncm=df["ncm"].str.rjust(8, "0"))
    assert all(df["ncm"].str.len() == 8)
    df = df.assign(hs8=df["ncm"])
    df = df.assign(hs6=df["hs8"].str.slice(0, 6))
    df = df.assign(hs4=df["hs8"].str.slice(0, 4))
    return df


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "cd_combined_chicken_2015")
    dbt.source("trase-storage-raw", "cd_combined_copper_ore_2015")
    dbt.source("trase-storage-raw", "cd_combined_cocoa_2015")
    dbt.source("trase-storage-raw", "cd_combined_tobacco_2015")
    dbt.source("trase-storage-raw", "cd_combined_palm_oil_2015")
    dbt.source("trase-storage-raw", "cd_combined_iron_ore_2015")
    dbt.source("trase-storage-raw", "cd_combined_soy_2015")
    dbt.source("trase-storage-raw", "cd_combined_corn_2015")
    dbt.source("trase-storage-raw", "cd_combined_niobium_2015")
    dbt.source("trase-storage-raw", "cd_combined_pork_2015")
    dbt.source("trase-storage-raw", "cd_combined_cotton_2015")
    dbt.source("trase-storage-raw", "cd_combined_coffee_2015")
    dbt.source("trase-storage-raw", "cd_combined_beef_2015")
    dbt.source("trase-storage-raw", "cd_combined_aluminium_2015")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})