Skip to content

Conab Static Capacity 202107

s3://trase-storage/brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.csv

Dbt path: trase_production.main_brazil.conab_static_capacity_202107

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/logistics/conab/static_capacity/out/_schema.yml

Model file link: trase/data_pipeline/models/brazil/logistics/conab/static_capacity/out/conab_static_capacity_202107.py

Calls script: trase/data/brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, brazil, conab, logistics, out, static_capacity


conab_static_capacity_202107

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ac
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_mt
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rj
  • source.trase_duckdb.trase-storage-raw.uf
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_go
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pr
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rr
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pb
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_es
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rn
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_al
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ro
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_sc
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_se
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ap
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_df
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pi
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_am
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pe
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ce
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ba
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pa
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_sp
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_to
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ma
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_ms
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_mg
  • source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rs

Sources

  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ac']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_mt']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_rj']
  • ['trase-storage-raw', 'uf']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_go']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_pr']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_rr']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_pb']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_es']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_rn']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_al']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ro']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_sc']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_se']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ap']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_df']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_pi']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_am']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_pe']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ce']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ba']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_pa']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_sp']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_to']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ma']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_ms']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_mg']
  • ['trase-storage-raw', 'capacidade_estatica_21_07_2021_rs']
"""
Brazil - CONAB - Capacidade Estática de Armazenamento (Static Storage Capacity)
"""

import pandas as pd

from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import full_merge, rename
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers import (
    read_s3_folder,
    get_pandas_df,
)


def main():
    df = load_data()
    df = insert_geocode(df)
    df = organize_data_structure(df)
    write_csv_for_upload(
        df,
        "brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.csv",
    )


def normalize_str(dataframe: pd.DataFrame, col: str):
    dataframe[col] = (
        dataframe[col]
        .str.normalize("NFKD")
        .str.encode("ascii", errors="ignore")
        .str.decode("utf-8")
    )
    dataframe[col] = dataframe[col].str.upper()

    return dataframe


def load_data():
    """Load data from S3 bucket"""

    s3_folder = read_s3_folder("brazil/logistics/conab/static_capacity/in/")

    data_list = []
    for file in s3_folder:
        if file.key.endswith("csv"):
            raw_data = get_pandas_df(
                file.key, sep=",", dtype=str, keep_default_na=False
            )
            raw_data = raw_data.drop(
                raw_data[raw_data["Município"] == "Total Geral"].index
            )
            raw_data = rename(
                raw_data,
                columns={
                    "Município": "MUNICIPALITY",
                    "Convencional_Quantidade": "CONVENTIONAL_QUANTITY",
                    "Convencional_Capacidade_ton": "CONVENTIONAL_CAPACITY",
                    "Granel_Quantidade": "BULK_QUANTITY",
                    "Granel_Capacidade_ton": "BULK_CAPACITY",
                    "Total_Quantidade": "TOTAL_QUANTITY",
                    "Total_Capacidade_ton": "TOTAL_CAPACITY",
                },
            )
            raw_data = normalize_str(raw_data, "MUNICIPALITY")
            data_list.append(raw_data)
    data = pd.concat(data_list, sort=False)

    data = data.drop(data[["Unnamed: 8", "Unnamed: 9", "Unnamed: 10"]], axis=1)
    data = data.drop(data[data["UF"] == ""].index)

    return data


def insert_geocode(df: pd.DataFrame):
    """Insert Municipality Geocode"""

    def get_state_uf():
        d = get_pandas_df(
            "brazil/metadata/UF.csv",
            sep=",",
            usecols=("CO_UF_IBGE", "CO_UF", "UF"),
            dtype=str,
        )
        d = d.rename(
            columns={
                "CO_UF_IBGE": "state.code",
                "CO_UF": "state.uf_number",
            },
            errors="raise",
        )
        d = d.drop(columns="state.uf_number", errors="raise")
        return d

    def replace_state_uf_codes_with_names_and_trase_ids(d, column):
        df_state_uf = get_state_uf()[[column, "state.code"]].drop_duplicates()
        return full_merge(d, df_state_uf, on=column, how="left", validate="many_to_one")

    df = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
    df["MUN_UF"] = df["MUNICIPALITY"] + " - " + df["state.code"]

    # Fetch Municipality's TRASE_ID
    @uses_database
    def get_geocode(cnx=None):
        br_geocode = pd.read_sql(
            """ SELECT name, trase_id as "TRASE_ID",
                UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4,2) name_id
                FROM website.regions
                WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
            cnx.cnx,
        )

        return br_geocode

    df = full_merge(
        df,
        get_geocode(),
        how="left",
        left_on="MUN_UF",
        right_on="name_id",
        validate="many_to_one",
    )

    return df


def organize_data_structure(df: pd.DataFrame):
    """Organizes the data into the desirable final structure"""

    # Final structure
    df = df.drop(
        columns=["state.code", "name", "name_id", "MUN_UF"],
        errors="raise",
    )
    df = df[
        [
            "TRASE_ID",
            "UF",
            "MUNICIPALITY",
            "CONVENTIONAL_QUANTITY",
            "CONVENTIONAL_CAPACITY",
            "BULK_QUANTITY",
            "BULK_CAPACITY",
            "TOTAL_QUANTITY",
            "TOTAL_CAPACITY",
        ]
    ]

    # Check if there are no empty values
    for column in df.columns:
        assert not any(df[column].isna())
        assert not any(df[df[column] == ""]) == 0
        assert not any(df[df[column].str.contains(",")]) == 0
        assert not any(df[df[column] == "NAN"]) == 0

    return df


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ac")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_mt")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rj")
    dbt.source("trase-storage-raw", "uf")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_go")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pr")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rr")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pb")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_es")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rn")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_al")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ro")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_sc")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_se")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ap")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_df")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pi")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_am")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pe")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ce")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ba")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pa")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_sp")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_to")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ma")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ms")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_mg")
    dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rs")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})