DBT: Brazil Datamyne Cd 2017
File location: s3://trase-storage/brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.csv
DBT model name: brazil_datamyne_cd_2017
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.brazil_datamyne_cd_2017 -
Containing yaml link: trase/data_pipeline/models/brazil/trade/cd/datamyne/2017/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/trade/cd/datamyne/2017/brazil_datamyne_cd_2017.py
-
Calls script:
trase/data/brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.py -
Tags:
mock_model,2017,brazil,cd,datamyne,trade
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.2017_1source.trase_duckdb.trase-storage-raw.2017_5source.trase_duckdb.trase-storage-raw.2017_3source.trase_duckdb.trase-storage-raw.2017_4source.trase_duckdb.trase-storage-raw.2017_8source.trase_duckdb.trase-storage-raw.2017_2source.trase_duckdb.trase-storage-raw.2017_6source.trase_duckdb.trase-storage-raw.2017_7
Sources
['trase-storage-raw', '2017_1']['trase-storage-raw', '2017_5']['trase-storage-raw', '2017_3']['trase-storage-raw', '2017_4']['trase-storage-raw', '2017_8']['trase-storage-raw', '2017_2']['trase-storage-raw', '2017_6']['trase-storage-raw', '2017_7']
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.tools import uses_database
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import concat, full_merge
def main():
df = concat(
[
get_pandas_df(path, xlsx=True, skiprows=10, dtype=str, usecols="A:L")[:-9]
for path in [
"brazil/trade/cd/datamyne/2017/originals/2017_1.xls",
"brazil/trade/cd/datamyne/2017/originals/2017_2.xls",
"brazil/trade/cd/datamyne/2017/originals/2017_3.xlsx",
"brazil/trade/cd/datamyne/2017/originals/2017_4.xlsx",
"brazil/trade/cd/datamyne/2017/originals/2017_5.xlsx",
"brazil/trade/cd/datamyne/2017/originals/2017_6.xlsx",
"brazil/trade/cd/datamyne/2017/originals/2017_7.xls",
"brazil/trade/cd/datamyne/2017/originals/2017_8.xlsx",
]
]
)
# alter column names
df = df.rename(
columns={
"Country of Destination": "country_of_destination.label",
"Date (Month)": "date",
"Exporter CNJP": "exporter.cnpj",
"Exporter Municipality": "exporter.municipality.label",
"State / Department of the Exporter": "exporter.state.label",
"Exporter Name": "exporter.label",
"FOB Value (US$)": "fob",
"Net Weight": "vol",
"Port of Departure": "port_of_export.label",
"Product HS": "hs8",
},
errors="raise",
)
df = add_derived_convenience_columns(df)
df = clean_countries(df)
df = clean_ports(df)
df = clean_cnpjs(df)
df = clean_municipalities(df)
df = clean_states(df)
write_csv_for_upload(
df, "brazil/trade/cd/datamyne/2017/BRAZIL_DATAMYNE_CD_2017.csv"
)
def add_derived_convenience_columns(df):
assert all(df["hs8"].str.len() == 8)
df = df.assign(hs4=df["hs8"].str.slice(0, 4))
df = df.assign(hs6=df["hs8"].str.slice(0, 6))
df = df.assign(year=df["date"].str.slice(0, 4).astype(int))
df = df.assign(month=df["date"].str.slice(4, 6).astype(int))
return df
@uses_database
def clean_countries(df, cnx=None):
df_country_labels = pd.read_sql(
"""
select distinct
name as "country_of_destination.name",
unnest(synonyms) as "country_of_destination.label",
coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
from views.regions where level = 1 and length(trase_id) = 2
""",
cnx.cnx,
)
df_country_labels = df_country_labels.append(
{
"country_of_destination.name": "UNKNOWN COUNTRY",
"country_of_destination.trase_id": "XX",
"country_of_destination.label": "EUROPEAN",
},
ignore_index=True,
verify_integrity=True,
)
return full_merge(
df,
df_country_labels,
on="country_of_destination.label",
validate="many_to_one",
how="left",
)
@uses_database
def clean_ports(df, cnx=None):
df_port_labels = pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions where region_type = 'PORT' and country = 'BRAZIL'
""",
cnx.cnx,
)
return full_merge(
df,
df_port_labels,
on="port_of_export.label",
validate="many_to_one",
how="left",
)
def clean_cnpjs(df):
df = df.copy()
def validate(cnpj):
if len(cnpj) == 14:
return "cnpj", stdnum.br.cnpj.is_valid(cnpj)
elif len(cnpj) == 11:
return "cpf", stdnum.br.cpf.is_valid(cnpj)
else:
raise ValueError(cnpj)
df["exporter.type"], is_valid = zip(*df["exporter.cnpj"].apply(validate))
assert all(is_valid)
return df
@uses_database
def clean_municipalities(df, cnx=None):
# Given the CNPJs and their municipality labels, get a lookup of CNPJs to their
# municipality trase id. Since we do not have the state information (we can't trust
# the state column in the original data!) we are only able to do this for those
# municipalities whose labels umabiguously identify a municipality.
# first get a list of CNPJs with their municipality label
df_lookup = df[["exporter.cnpj", "exporter.municipality.label"]].drop_duplicates()
# add an index so that we can keep track of ambiguous municipality labels later
df_lookup = df_lookup.assign(id=range(len(df_lookup)))
# merge in municipalities
df_municipalities = pd.read_sql(
f"""
select distinct
name as "exporter.municipality.name",
unnest(synonyms) as "exporter.municipality.label",
trase_id as "exporter.municipality.trase_id"
from views.regions
where country = 'BRAZIL' and region_type = 'MUNICIPALITY'
""",
cnx.cnx,
)
df_lookup = pd.merge(
df_lookup, df_municipalities, on=["exporter.municipality.label"]
)
# drop labels - now we only have name and trase id
df_lookup = df_lookup.drop(columns="exporter.municipality.label", errors="raise")
df_lookup = df_lookup.drop_duplicates()
# ignore any municipalities for which the label was ambiguous
df_lookup = df_lookup[~df_lookup.pop("id").duplicated()]
# apply this lookup to the original data
return full_merge(
df,
df_lookup,
on="exporter.cnpj",
validate="many_to_one",
how="left",
)
@uses_database
def clean_states(df, cnx=None):
df_states = pd.read_sql(
"""
select distinct
name as "exporter.state.name",
unnest(synonyms) as "exporter.state.label",
trase_id as "exporter.state.trase_id"
from views.regions where country = 'BRAZIL' and region_type = 'STATE'
""",
cnx.cnx,
)
return full_merge(
df,
df_states,
on="exporter.state.label",
validate="many_to_one",
how="left",
)
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "2017_1")
dbt.source("trase-storage-raw", "2017_5")
dbt.source("trase-storage-raw", "2017_3")
dbt.source("trase-storage-raw", "2017_4")
dbt.source("trase-storage-raw", "2017_8")
dbt.source("trase-storage-raw", "2017_2")
dbt.source("trase-storage-raw", "2017_6")
dbt.source("trase-storage-raw", "2017_7")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})