Cd Combined 2015
s3://trase-storage/brazil/trade/cd/combined/CD_COMBINED_2015.csv
Dbt path: trase_production.main_brazil.cd_combined_2015
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/trade/cd/combined/_schema.yml
Model file link: trase/data_pipeline/models/brazil/trade/cd/combined/cd_combined_2015.py
Calls script: trase/data/brazil/trade/cd/combined/CD_COMBINED_201X.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, cd, combined, trade
cd_combined_2015
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/cd/combined/CD_COMBINED_201X.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.cd_combined_chicken_2015source.trase_duckdb.trase-storage-raw.cd_combined_copper_ore_2015source.trase_duckdb.trase-storage-raw.cd_combined_cocoa_2015source.trase_duckdb.trase-storage-raw.cd_combined_tobacco_2015source.trase_duckdb.trase-storage-raw.cd_combined_palm_oil_2015source.trase_duckdb.trase-storage-raw.cd_combined_iron_ore_2015source.trase_duckdb.trase-storage-raw.cd_combined_soy_2015source.trase_duckdb.trase-storage-raw.cd_combined_corn_2015source.trase_duckdb.trase-storage-raw.cd_combined_niobium_2015source.trase_duckdb.trase-storage-raw.cd_combined_pork_2015source.trase_duckdb.trase-storage-raw.cd_combined_cotton_2015source.trase_duckdb.trase-storage-raw.cd_combined_coffee_2015source.trase_duckdb.trase-storage-raw.cd_combined_beef_2015source.trase_duckdb.trase-storage-raw.cd_combined_aluminium_2015
Sources
['trase-storage-raw', 'cd_combined_chicken_2015']['trase-storage-raw', 'cd_combined_copper_ore_2015']['trase-storage-raw', 'cd_combined_cocoa_2015']['trase-storage-raw', 'cd_combined_tobacco_2015']['trase-storage-raw', 'cd_combined_palm_oil_2015']['trase-storage-raw', 'cd_combined_iron_ore_2015']['trase-storage-raw', 'cd_combined_soy_2015']['trase-storage-raw', 'cd_combined_corn_2015']['trase-storage-raw', 'cd_combined_niobium_2015']['trase-storage-raw', 'cd_combined_pork_2015']['trase-storage-raw', 'cd_combined_cotton_2015']['trase-storage-raw', 'cd_combined_coffee_2015']['trase-storage-raw', 'cd_combined_beef_2015']['trase-storage-raw', 'cd_combined_aluminium_2015']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.pcs.connect import uses_database
SOURCES = [
("ALUMINIUM", "brazil/aluminium/trade/cd/combined/CD_COMBINED_ALUMINIUM_2015.csv"),
("BEEF", "brazil/beef/trade/cd/combined/CD_COMBINED_BEEF_2015.csv"),
("CHICKEN", "brazil/chicken/trade/cd/combined/CD_COMBINED_CHICKEN_2015.csv"),
("COCOA", "brazil/cocoa/trade/cd/combined/CD_COMBINED_COCOA_2015.csv"),
("COFFEE", "brazil/coffee/trade/cd/combined/CD_COMBINED_COFFEE_2015.csv"),
("COPPER_ORE", "brazil/copper/trade/cd/combined/CD_COMBINED_COPPER_ORE_2015.csv"),
("CORN", "brazil/corn/trade/cd/combined/CD_COMBINED_CORN_2015.csv"),
("COTTON", "brazil/cotton/trade/cd/combined/CD_COMBINED_COTTON_2015.csv"),
("IRON_ORE", "brazil/iron_ore/trade/cd/combined/CD_COMBINED_IRON_ORE_2015.csv"),
("NIOBIUM", "brazil/niobium/trade/cd/combined/CD_COMBINED_NIOBIUM_2015.csv"),
("PALM_OIL", "brazil/palm_oil/trade/cd/combined/CD_COMBINED_PALM_OIL_2015.csv"),
("PORK", "brazil/pork/trade/cd/combined/CD_COMBINED_PORK_2015.csv"),
("SOY", "brazil/soy/trade/cd/combined/CD_COMBINED_SOY_2015.csv"),
("TOBACCO", "brazil/tobacco/trade/cd/combined/CD_COMBINED_TOBACCO_2015.csv"),
]
def main():
S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
df = pd.concat(
get_pandas_df(key, sep=";", dtype=str, keep_default_na=False).assign(
commodity=commodity
)
for commodity, key in SOURCES
)
df = df.rename(
columns={
"country": "country_of_destination.label",
"state": "state.label",
"port": "port_of_export.label",
"geocode": "municipality.geocode",
"exporter": "exporter.label",
"cnpj": "exporter.cnpj",
},
errors="raise",
)
df["municipality.trase_id"] = "BR-" + df["municipality.geocode"]
df = clean_countries(df)
df = clean_ports(df)
df = clean_states(df)
df = clean_cnpjs(df)
df = add_derived_columns(df)
write_csv_for_upload(df, f"brazil/trade/cd/combined/CD_COMBINED_{year}.csv")
def clean_cnpjs(df):
df = df.copy()
cnpj = df["exporter.cnpj"].str.rjust(14, "0")
valid_cnpj = cnpj.apply(stdnum.br.cnpj.is_valid)
cpf = df["exporter.cnpj"].str.rjust(11, "0")
valid_cpf = cpf.apply(stdnum.br.cpf.is_valid)
# manually override some cases
valid_cnpj[cnpj == "00000523682891"] = False
valid_cnpj[cnpj == "00004925661000"] = False
valid_cnpj[cnpj == "00000408796901"] = False
valid_cnpj[cnpj == "00000297458108"] = False
# every tax code should be a CNPJ or a CPF, but not both
assert not any(valid_cpf & valid_cnpj)
assert all(valid_cpf | valid_cnpj)
df["exporter.cnpj"] = np.where(valid_cnpj, cnpj, cpf)
df["exporter.type"] = np.where(valid_cnpj, "cnpj", "cpf")
return df
def assert_none_missing(df, column):
missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
assert missing.empty, f"Not all {column} found:\n{missing}"
def clean_ports(df):
"""We clean POL (Port of Lading) which corresponds to Trase's port of export"""
df = pd.merge(
df,
get_port_labels(),
on="port_of_export.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "port_of_export.label")
return df
def clean_countries(df):
df = pd.merge(
df,
get_country_labels(),
on="country_of_destination.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "country_of_destination.label")
return df
def clean_states(df) -> pd.DataFrame:
df = pd.merge(
df,
get_states(),
on="state.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "state.label")
return df
@uses_database
def get_country_labels(cnx=None):
df = pd.read_sql(
"""
select distinct
name as "country_of_destination.name",
unnest(synonyms) as "country_of_destination.label",
coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
from views.regions where level = 1 and length(trase_id) >= 2
""",
cnx.cnx,
)
df = df.append(
{
"country_of_destination.name": "UNKNOWN COUNTRY",
"country_of_destination.trase_id": "XX",
"country_of_destination.label": "EUROPEAN",
},
ignore_index=True,
verify_integrity=True,
)
return df
@uses_database
def get_port_labels(cnx=None):
return pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions where region_type = 'PORT' and country = 'BRAZIL'
""",
cnx.cnx,
)
@uses_database
def get_states(cnx=None) -> pd.DataFrame:
return pd.read_sql(
"""
select distinct
name as "state.name",
unnest(synonyms) as "state.label",
trase_id AS "state.trase_id"
from views.regions
where country = 'BRAZIL'
and region_type = 'STATE'
and trase_id is not null
""",
cnx.cnx,
)
def add_derived_columns(df):
df = df.assign(ncm=df["ncm"].str.rjust(8, "0"))
assert all(df["ncm"].str.len() == 8)
df = df.assign(hs8=df["ncm"])
df = df.assign(hs6=df["hs8"].str.slice(0, 6))
df = df.assign(hs4=df["hs8"].str.slice(0, 4))
return df
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "cd_combined_chicken_2015")
dbt.source("trase-storage-raw", "cd_combined_copper_ore_2015")
dbt.source("trase-storage-raw", "cd_combined_cocoa_2015")
dbt.source("trase-storage-raw", "cd_combined_tobacco_2015")
dbt.source("trase-storage-raw", "cd_combined_palm_oil_2015")
dbt.source("trase-storage-raw", "cd_combined_iron_ore_2015")
dbt.source("trase-storage-raw", "cd_combined_soy_2015")
dbt.source("trase-storage-raw", "cd_combined_corn_2015")
dbt.source("trase-storage-raw", "cd_combined_niobium_2015")
dbt.source("trase-storage-raw", "cd_combined_pork_2015")
dbt.source("trase-storage-raw", "cd_combined_cotton_2015")
dbt.source("trase-storage-raw", "cd_combined_coffee_2015")
dbt.source("trase-storage-raw", "cd_combined_beef_2015")
dbt.source("trase-storage-raw", "cd_combined_aluminium_2015")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})