DBT: Brazil Bol Dataliner Soy 2018
File location: s3://trase-storage/brazil/trade/bol/2018/BRAZIL_BOL_DATALINER_SOY_2018.csv
DBT model name: brazil_bol_dataliner_soy_2018
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.brazil_bol_dataliner_soy_2018 -
Containing yaml link: trase/data_pipeline/models/brazil/trade/bol/2018/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/trade/bol/2018/brazil_bol_dataliner_soy_2018.py
-
Calls script:
trase/data/brazil/trade/bol/201X/BRAZIL_BOL_DATALINER_SOY_201X.py -
Tags:
mock_model,2018,bol,brazil,trade
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/201X/BRAZIL_BOL_DATALINER_SOY_201X.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.dataliner_soy_2017_2019_cleaned
Sources
['trase-storage-raw', 'dataliner_soy_2017_2019_cleaned']
"""
Creates the following files:
s3://trase-storage/brazil/trade/bol/2017/BRAZIL_BOL_DATALINER_SOY_2017.csv
s3://trase-storage/brazil/trade/bol/2018/BRAZIL_BOL_DATALINER_SOY_2018.csv
s3://trase-storage/brazil/trade/bol/2019/BRAZIL_BOL_DATALINER_SOY_2019.csv
"""
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.tools import uses_database
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import full_merge
COLUMNS = {
"Period/YYYYMMDD": "date",
"Commodity_HS_Datamar/HS6 Code": "hs6",
"Commodity_HS_Datamar/HS8 Code": "hs8",
"Company_Consignee/Consignee Name": "importer",
"Company_Shipper/Shipper Name": "exporter.label",
"Company_Shipper/Registration Number": "exporter.cnpj",
"Company_Shipper/Country Name": "exporter.country.label",
"Company_Shipper/State": "exporter.state.label",
"Company_Shipper/City": "exporter.city",
"Company_Shipper/Street": "exporter.street",
"Company_Shipper/Zip": "exporter.zip_code",
"Place_and_Ports/DEST_Country": "country_of_destination.label",
"Place_and_Ports/DEST_Name": "port_of_import",
"Place_and_Ports/POD_Country": "country_of_discharge.label",
"Place_and_Ports/POD_Name": "port_of_discharge.label",
"Place_and_Ports/POL_Country": "country_of_production",
"Place_and_Ports/POL_Name": "port_of_export.label",
"WTKG": "vol",
}
UNKNOWN_COUNTRY_LABELS = ["EUROPEAN", "EUROPE MED"]
UNKNOWN_CNPJ = "X" * 14
CNPJ_FIXES = {
"5492968000449": "05492968000449",
"0": UNKNOWN_CNPJ,
"": UNKNOWN_CNPJ,
}
def validate_hs_codes_and_add_hs4(df):
assert all(df["hs6"] == df["hs8"].str.slice(0, 6))
assert all(df["hs8"].str.len() == 8)
return df.assign(hs4=df["hs6"].str.slice(0, 4))
@uses_database
def clean_countries(df, column, cnx=None):
df_country_labels = pd.read_sql(
f"""
select distinct
name as "{column}.name",
unnest(synonyms) as "{column}.label",
coalesce(trase_id, 'XX') AS "{column}.trase_id"
from views.regions where level = 1 and length(trase_id) = 2
""",
cnx.cnx,
)
df_country_labels = df_country_labels.append(
[
{
f"{column}.name": "UNKNOWN COUNTRY",
f"{column}.trase_id": "XX",
f"{column}.label": label,
}
for label in UNKNOWN_COUNTRY_LABELS
],
ignore_index=True,
verify_integrity=True,
)
return full_merge(
df,
df_country_labels,
on=f"{column}.label",
validate="many_to_one",
how="left",
)
@uses_database
def clean_ports(df, cnx=None):
df_port_labels = pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions where region_type = 'PORT' and country = 'BRAZIL'
""",
cnx.cnx,
)
return full_merge(
df,
df_port_labels,
on="port_of_export.label",
validate="many_to_one",
how="left",
)
def clean_cnpjs(df):
df = df.copy()
df["exporter.cnpj"] = df["exporter.cnpj"].apply(lambda x: CNPJ_FIXES.get(x, x))
def validate(cnpj):
if cnpj == UNKNOWN_CNPJ:
return "unknown"
elif len(cnpj) == 14 and stdnum.br.cnpj.is_valid(cnpj):
return "cnpj"
elif len(cnpj) == 11 and stdnum.br.cpf.is_valid(cnpj):
return "cpf"
else:
return "invalid"
df["exporter.type"] = df["exporter.cnpj"].apply(validate)
return df
def main():
df = get_pandas_df(
"brazil/trade/bol/2017/originals/DATALINER_SOY_2017_2019_cleaned.csv",
sep=";",
encoding="utf8",
keep_default_na=False,
dtype=str,
)
df = df[COLUMNS].rename(columns=COLUMNS, errors="raise")
date = df.pop("date")
df["year"] = date.str.slice(0, 4).astype(int)
df["month"] = date.str.slice(4, 6).astype(int)
df["day"] = date.str.slice(6, 8).astype(int)
assert all(df["country_of_production"] == "BRAZIL")
df = validate_hs_codes_and_add_hs4(df)
df = clean_countries(df, "country_of_destination")
df = clean_countries(df, "country_of_discharge")
df = clean_ports(df)
df = clean_cnpjs(df)
print("Now run:")
for year in df["year"].unique():
write_csv_for_upload(
df[df["year"] == year],
f"brazil/trade/bol/{year}/BRAZIL_BOL_DATALINER_SOY_{year}.csv",
)
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "dataliner_soy_2017_2019_cleaned")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})