DBT: Brazil Bol 2015
File location: s3://trase-storage/brazil/trade/bol/2015/BRAZIL_BOL_2015.csv
DBT model name: brazil_bol_2015
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.brazil_bol_2015 -
Containing yaml link: trase/data_pipeline/models/brazil/trade/bol/2015/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/trade/bol/2015/brazil_bol_2015.py
-
Calls script:
trase/data/brazil/trade/bol/2015/BRAZIL_BOL_2015.py -
Tags:
mock_model,2015,bol,brazil,trade
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/2015/BRAZIL_BOL_2015.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.piers_2015
Sources
['trase-storage-raw', 'piers_2015']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.tools import uses_database
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import full_merge
COLUMNS = {
"HS8_Code": "hs8",
"POL_Name": "port_of_export.label",
"YYYYMM": "date",
"DEST_Country": "country_of_destination.label",
"Shipper": "exporter.label",
"Shipper_Type": "shipper_type",
"Shipper_City": "exporter.municipality.label",
"Shipper_State": "exporter.state.label",
"Shipper_Zip": "exporter.zip",
"Shipper_Country_Name": "exporter.country.label",
"Shipper_Registration_Number": "exporter.cnpj",
"WTMT": "vol", # is this right?
}
def clean_hs_codes(df):
hs8 = df["hs8"].astype(str).str.rjust(8, "0")
assert all(hs8.str.len() == 8)
return df.assign(hs8=hs8, hs6=hs8.str.slice(0, 6), hs4=hs8.str.slice(0, 4))
def clean_cnpjs(df):
df = df.copy()
df["exporter.cnpj"] = df["exporter.cnpj"].astype(str)
cnpj = df["exporter.cnpj"].str.rjust(14, "0")
cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)
cpf = df["exporter.cnpj"].str.rjust(11, "0")
cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)
assert not any(cnpj_valid & cpf_valid)
df["exporter.type"] = "unknown"
df.loc[cnpj_valid, "exporter.type"] = "cnpj"
df.loc[cpf_valid, "exporter.type"] = "cpf"
df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])
return df
@uses_database
def get_port_labels(cnx=None):
return pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions where region_type = 'PORT' and country = 'BRAZIL'
""",
cnx.cnx,
)
def clean_ports(df):
return full_merge(
df,
get_port_labels(),
on="port_of_export.label",
validate="many_to_one",
how="left",
)
@uses_database
def get_country_labels(cnx=None):
return pd.read_sql(
"""
select distinct
name as "country_of_destination.name",
unnest(synonyms) as "country_of_destination.label",
coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
from views.regions where level = 1 and length(trase_id) = 2
""",
cnx.cnx,
)
def clean_countries(df):
return full_merge(
df,
get_country_labels(),
on="country_of_destination.label",
validate="many_to_one",
how="left",
)
def main():
df = get_pandas_df_once(
"brazil/trade/bol/2015/originals/PIERS_2015.xlsx",
xlsx=True,
keep_default_na=False,
)
df = df[COLUMNS].rename(columns=COLUMNS, errors="raise")
date = df.pop("date").astype(str)
df["year"] = date.str.slice(0, 4).astype(int)
df["month"] = date.str.slice(4, 6).astype(int)
df = clean_hs_codes(df)
df = clean_countries(df)
df = clean_ports(df)
df = clean_cnpjs(df)
write_csv_for_upload(df, "brazil/trade/bol/2015/BRAZIL_BOL_2015.csv")
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "piers_2015")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})