Skip to content

Cd Combined Poultry 2017 Cleaned

s3://trase-storage/brazil/chicken/trade/cd/combined/CD_COMBINED_POULTRY_2017_CLEANED.csv

Dbt path: trase_production.main_brazil.cd_combined_poultry_2017_cleaned

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/chicken/trade/cd/combined/_schema.yml

Model file link: trase/data_pipeline/models/brazil/chicken/trade/cd/combined/cd_combined_poultry_2017_cleaned.py

Calls script: trase/data/brazil/commodity/trade/cd/combined/cd_combined_poultry_2017_cleaned.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, brazil, cd, chicken, combined, trade


cd_combined_poultry_2017_cleaned

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/commodity/trade/cd/combined/cd_combined_poultry_2017_cleaned.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.cd_combined_poultry_2017

Sources

  • ['trase-storage-raw', 'cd_combined_poultry_2017']
import pandas as pd

from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs.connect import CNX

COLUMN_RENAMES = {
    "cnpj": "exporter.cnpj",
    "country": "country_of_destination.label",
    "exporter": "exporter.name",
    "geocode": "municipality.code",
    "importer": "importer.name",
    "ncm": "hs8",
    "port": "port_of_export.label",
    "state": "state.label",
}


def main():
    df = get_pandas_df_once(
        "brazil/chicken/trade/cd/combined/CD_COMBINED_POULTRY_2017.csv",
        dtype=str,
        sep=";",
        encoding="ascii",
        keep_default_na=False,
    )
    df = df.rename(columns=COLUMN_RENAMES, errors="raise")
    df["hs6"] = df["hs8"].str.slice(0, 6)
    df["hs4"] = df["hs8"].str.slice(0, 4)
    assert all(df["hs6"] == df.pop("HS6"))

    df = clean_municipalities(df)
    df = clean_ports(df)
    df = clean_countries(df)
    df = clean_states(df)

    write_csv_for_upload(
        df,
        "brazil/chicken/trade/cd/combined/CD_COMBINED_POULTRY_2017_CLEANED.csv",
    )


def clean_municipalities(df) -> pd.DataFrame:
    df.loc[df["municipality.code"] == "", "municipality.code"] = "XXXXXXX"
    df["municipality.trase_id"] = "BR-" + df["municipality.code"]
    assert all(df["municipality.trase_id"].str.len() == 10)
    return df


def clean_ports(df) -> pd.DataFrame:
    """We clean POL (Port of Lading) which corresponds to Trase's port of export"""
    df = pd.merge(
        df,
        get_ports(),
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "port_of_export.label")
    return df


def get_ports() -> pd.DataFrame:
    df = pd.read_sql(
        """
        select distinct 
               name as "port_of_export.name",
               unnest(synonyms) as "port_of_export.label"
          from views.regions 
         where region_type = 'PORT'
           and country = 'BRAZIL'
        """,
        CNX.cnx,
    )
    return df


def clean_countries(df) -> pd.DataFrame:
    df = pd.merge(
        df,
        get_countries(),
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "country_of_destination.label")
    return df


def get_countries() -> pd.DataFrame:
    df = pd.read_sql(
        """
        select distinct 
               name as "country_of_destination.name",
               unnest(synonyms) as "country_of_destination.label",
               coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
          from views.regions 
         where level = 1
           and length(trase_id) = 2
        """,
        CNX.cnx,
    )
    return df


def clean_states(df) -> pd.DataFrame:
    df.loc[df["state.label"] == "", "state.label"] = "UNKNOWN STATE"
    df = pd.merge(
        df,
        get_states(),
        on="state.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "state.label")
    return df


def get_states() -> pd.DataFrame:
    df = pd.read_sql(
        """
        select distinct 
               name as "state.name",
               unnest(synonyms) as "state.label",
               trase_id AS "state.trase_id"
          from views.regions 
         where country = 'BRAZIL' 
           and region_type = 'STATE' 
           and trase_id is not null
        """,
        CNX.cnx,
    )
    return df


def assert_none_missing(df, column):
    missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
    assert missing.empty, f"Not all {column} found:\n{missing}"


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "cd_combined_poultry_2017")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})