Conab Static Capacity 202107
s3://trase-storage/brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.csv
Dbt path: trase_production.main_brazil.conab_static_capacity_202107
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/logistics/conab/static_capacity/out/_schema.yml
Model file link: trase/data_pipeline/models/brazil/logistics/conab/static_capacity/out/conab_static_capacity_202107.py
Calls script: trase/data/brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, conab, logistics, out, static_capacity
conab_static_capacity_202107
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_acsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_mtsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rjsource.trase_duckdb.trase-storage-raw.ufsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_gosource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_prsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rrsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pbsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_essource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rnsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_alsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rosource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_scsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_sesource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_apsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_dfsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pisource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_amsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pesource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_cesource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_basource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_pasource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_spsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_tosource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_masource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_mssource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_mgsource.trase_duckdb.trase-storage-raw.capacidade_estatica_21_07_2021_rs
Sources
['trase-storage-raw', 'capacidade_estatica_21_07_2021_ac']['trase-storage-raw', 'capacidade_estatica_21_07_2021_mt']['trase-storage-raw', 'capacidade_estatica_21_07_2021_rj']['trase-storage-raw', 'uf']['trase-storage-raw', 'capacidade_estatica_21_07_2021_go']['trase-storage-raw', 'capacidade_estatica_21_07_2021_pr']['trase-storage-raw', 'capacidade_estatica_21_07_2021_rr']['trase-storage-raw', 'capacidade_estatica_21_07_2021_pb']['trase-storage-raw', 'capacidade_estatica_21_07_2021_es']['trase-storage-raw', 'capacidade_estatica_21_07_2021_rn']['trase-storage-raw', 'capacidade_estatica_21_07_2021_al']['trase-storage-raw', 'capacidade_estatica_21_07_2021_ro']['trase-storage-raw', 'capacidade_estatica_21_07_2021_sc']['trase-storage-raw', 'capacidade_estatica_21_07_2021_se']['trase-storage-raw', 'capacidade_estatica_21_07_2021_ap']['trase-storage-raw', 'capacidade_estatica_21_07_2021_df']['trase-storage-raw', 'capacidade_estatica_21_07_2021_pi']['trase-storage-raw', 'capacidade_estatica_21_07_2021_am']['trase-storage-raw', 'capacidade_estatica_21_07_2021_pe']['trase-storage-raw', 'capacidade_estatica_21_07_2021_ce']['trase-storage-raw', 'capacidade_estatica_21_07_2021_ba']['trase-storage-raw', 'capacidade_estatica_21_07_2021_pa']['trase-storage-raw', 'capacidade_estatica_21_07_2021_sp']['trase-storage-raw', 'capacidade_estatica_21_07_2021_to']['trase-storage-raw', 'capacidade_estatica_21_07_2021_ma']['trase-storage-raw', 'capacidade_estatica_21_07_2021_ms']['trase-storage-raw', 'capacidade_estatica_21_07_2021_mg']['trase-storage-raw', 'capacidade_estatica_21_07_2021_rs']
"""
Brazil - CONAB - Capacidade Estática de Armazenamento (Static Storage Capacity)
"""
import pandas as pd
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import full_merge, rename
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers import (
read_s3_folder,
get_pandas_df,
)
def main():
df = load_data()
df = insert_geocode(df)
df = organize_data_structure(df)
write_csv_for_upload(
df,
"brazil/logistics/conab/static_capacity/out/CONAB_STATIC_CAPACITY_202107.csv",
)
def normalize_str(dataframe: pd.DataFrame, col: str):
dataframe[col] = (
dataframe[col]
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf-8")
)
dataframe[col] = dataframe[col].str.upper()
return dataframe
def load_data():
"""Load data from S3 bucket"""
s3_folder = read_s3_folder("brazil/logistics/conab/static_capacity/in/")
data_list = []
for file in s3_folder:
if file.key.endswith("csv"):
raw_data = get_pandas_df(
file.key, sep=",", dtype=str, keep_default_na=False
)
raw_data = raw_data.drop(
raw_data[raw_data["Município"] == "Total Geral"].index
)
raw_data = rename(
raw_data,
columns={
"Município": "MUNICIPALITY",
"Convencional_Quantidade": "CONVENTIONAL_QUANTITY",
"Convencional_Capacidade_ton": "CONVENTIONAL_CAPACITY",
"Granel_Quantidade": "BULK_QUANTITY",
"Granel_Capacidade_ton": "BULK_CAPACITY",
"Total_Quantidade": "TOTAL_QUANTITY",
"Total_Capacidade_ton": "TOTAL_CAPACITY",
},
)
raw_data = normalize_str(raw_data, "MUNICIPALITY")
data_list.append(raw_data)
data = pd.concat(data_list, sort=False)
data = data.drop(data[["Unnamed: 8", "Unnamed: 9", "Unnamed: 10"]], axis=1)
data = data.drop(data[data["UF"] == ""].index)
return data
def insert_geocode(df: pd.DataFrame):
"""Insert Municipality Geocode"""
def get_state_uf():
d = get_pandas_df(
"brazil/metadata/UF.csv",
sep=",",
usecols=("CO_UF_IBGE", "CO_UF", "UF"),
dtype=str,
)
d = d.rename(
columns={
"CO_UF_IBGE": "state.code",
"CO_UF": "state.uf_number",
},
errors="raise",
)
d = d.drop(columns="state.uf_number", errors="raise")
return d
def replace_state_uf_codes_with_names_and_trase_ids(d, column):
df_state_uf = get_state_uf()[[column, "state.code"]].drop_duplicates()
return full_merge(d, df_state_uf, on=column, how="left", validate="many_to_one")
df = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
df["MUN_UF"] = df["MUNICIPALITY"] + " - " + df["state.code"]
# Fetch Municipality's TRASE_ID
@uses_database
def get_geocode(cnx=None):
br_geocode = pd.read_sql(
""" SELECT name, trase_id as "TRASE_ID",
UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4,2) name_id
FROM website.regions
WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
cnx.cnx,
)
return br_geocode
df = full_merge(
df,
get_geocode(),
how="left",
left_on="MUN_UF",
right_on="name_id",
validate="many_to_one",
)
return df
def organize_data_structure(df: pd.DataFrame):
"""Organizes the data into the desirable final structure"""
# Final structure
df = df.drop(
columns=["state.code", "name", "name_id", "MUN_UF"],
errors="raise",
)
df = df[
[
"TRASE_ID",
"UF",
"MUNICIPALITY",
"CONVENTIONAL_QUANTITY",
"CONVENTIONAL_CAPACITY",
"BULK_QUANTITY",
"BULK_CAPACITY",
"TOTAL_QUANTITY",
"TOTAL_CAPACITY",
]
]
# Check if there are no empty values
for column in df.columns:
assert not any(df[column].isna())
assert not any(df[df[column] == ""]) == 0
assert not any(df[df[column].str.contains(",")]) == 0
assert not any(df[df[column] == "NAN"]) == 0
return df
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ac")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_mt")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rj")
dbt.source("trase-storage-raw", "uf")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_go")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pr")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rr")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pb")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_es")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rn")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_al")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ro")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_sc")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_se")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ap")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_df")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pi")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_am")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pe")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ce")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ba")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_pa")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_sp")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_to")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ma")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_ms")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_mg")
dbt.source("trase-storage-raw", "capacidade_estatica_21_07_2021_rs")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})