Cd Combined Poultry 2017 Cleaned
s3://trase-storage/brazil/chicken/trade/cd/combined/CD_COMBINED_POULTRY_2017_CLEANED.csv
Dbt path: trase_production.main_brazil.cd_combined_poultry_2017_cleaned
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/chicken/trade/cd/combined/_schema.yml
Model file link: trase/data_pipeline/models/brazil/chicken/trade/cd/combined/cd_combined_poultry_2017_cleaned.py
Calls script: trase/data/brazil/commodity/trade/cd/combined/cd_combined_poultry_2017_cleaned.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, cd, chicken, combined, trade
cd_combined_poultry_2017_cleaned
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/commodity/trade/cd/combined/cd_combined_poultry_2017_cleaned.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.cd_combined_poultry_2017
Sources
['trase-storage-raw', 'cd_combined_poultry_2017']
import pandas as pd
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs.connect import CNX
COLUMN_RENAMES = {
"cnpj": "exporter.cnpj",
"country": "country_of_destination.label",
"exporter": "exporter.name",
"geocode": "municipality.code",
"importer": "importer.name",
"ncm": "hs8",
"port": "port_of_export.label",
"state": "state.label",
}
def main():
df = get_pandas_df_once(
"brazil/chicken/trade/cd/combined/CD_COMBINED_POULTRY_2017.csv",
dtype=str,
sep=";",
encoding="ascii",
keep_default_na=False,
)
df = df.rename(columns=COLUMN_RENAMES, errors="raise")
df["hs6"] = df["hs8"].str.slice(0, 6)
df["hs4"] = df["hs8"].str.slice(0, 4)
assert all(df["hs6"] == df.pop("HS6"))
df = clean_municipalities(df)
df = clean_ports(df)
df = clean_countries(df)
df = clean_states(df)
write_csv_for_upload(
df,
"brazil/chicken/trade/cd/combined/CD_COMBINED_POULTRY_2017_CLEANED.csv",
)
def clean_municipalities(df) -> pd.DataFrame:
df.loc[df["municipality.code"] == "", "municipality.code"] = "XXXXXXX"
df["municipality.trase_id"] = "BR-" + df["municipality.code"]
assert all(df["municipality.trase_id"].str.len() == 10)
return df
def clean_ports(df) -> pd.DataFrame:
"""We clean POL (Port of Lading) which corresponds to Trase's port of export"""
df = pd.merge(
df,
get_ports(),
on="port_of_export.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "port_of_export.label")
return df
def get_ports() -> pd.DataFrame:
df = pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions
where region_type = 'PORT'
and country = 'BRAZIL'
""",
CNX.cnx,
)
return df
def clean_countries(df) -> pd.DataFrame:
df = pd.merge(
df,
get_countries(),
on="country_of_destination.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "country_of_destination.label")
return df
def get_countries() -> pd.DataFrame:
df = pd.read_sql(
"""
select distinct
name as "country_of_destination.name",
unnest(synonyms) as "country_of_destination.label",
coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
from views.regions
where level = 1
and length(trase_id) = 2
""",
CNX.cnx,
)
return df
def clean_states(df) -> pd.DataFrame:
df.loc[df["state.label"] == "", "state.label"] = "UNKNOWN STATE"
df = pd.merge(
df,
get_states(),
on="state.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "state.label")
return df
def get_states() -> pd.DataFrame:
df = pd.read_sql(
"""
select distinct
name as "state.name",
unnest(synonyms) as "state.label",
trase_id AS "state.trase_id"
from views.regions
where country = 'BRAZIL'
and region_type = 'STATE'
and trase_id is not null
""",
CNX.cnx,
)
return df
def assert_none_missing(df, column):
missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
assert missing.empty, f"Not all {column} found:\n{missing}"
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "cd_combined_poultry_2017")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})