Refining Facilities 2003 2019
s3://trase-storage/brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.csv
Dbt path: trase_production.main_brazil.refining_facilities_2003_2019
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/logistics/abiove/out/_schema.yml
Model file link: trase/data_pipeline/models/brazil/logistics/abiove/out/refining_facilities_2003_2019.py
Calls script: trase/data/brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, abiove, brazil, logistics, out
refining_facilities_2003_2019
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.21122018-114526-pesquisa_de_capacidade_instalada_2018source.trase_duckdb.trase-storage-raw.ufsource.trase_duckdb.trase-storage-raw.refining_facilities_2003_2016source.trase_duckdb.trase-storage-raw.pesquisa-de-capacidade-instalada_2019
Sources
['trase-storage-raw', '21122018-114526-pesquisa_de_capacidade_instalada_2018']['trase-storage-raw', 'uf']['trase-storage-raw', 'refining_facilities_2003_2016']['trase-storage-raw', 'pesquisa-de-capacidade-instalada_2019']
"""
Brazil - Crushing and Refineries Facilities
ABIOVE (Associação Brasileira das Indústrias de Óleos Vegetais)
"""
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_xlsx
from trase.tools.sei_pcs.pandas_utilities import *
def main():
df = refining_load_data()
df = refining_insert_geocode(df)
write_csv_for_upload(
df, "brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.csv"
)
# ==================================================================================== #
# DECLARE GENERAL FUNCTIONS
# ==================================================================================== #
def map_values(df: pd.DataFrame, col: str, method="equal", cond_result=None, **kwargs):
"""
Changes specific values from a given column based on a **kwargs.
Method can be equal, contains or conditional, equal is default.
Functions allows up to 3 conditions to be satisfied
::return::
pandas.Series
"""
if method == "equal":
for key, value in kwargs.items():
df.loc[df[col] == key, col] = value
elif method == "contains":
for key, value in kwargs.items():
df.loc[df[col].str.contains(key), col] = value
elif method == "conditional":
df.loc[
(
(df[list(kwargs.keys())[0]].isin([list(kwargs.values())[0]]))
& (df[list(kwargs.keys())[1]].isin([list(kwargs.values())[1]]))
& (df[list(kwargs.keys())[2]].isin(list(kwargs.values())[2]))
),
col,
] = cond_result
else:
raise ValueError(
"The chosen method must be 'equal', 'contains' or 'conditional'."
)
return df
def normalize_str(d: pd.DataFrame, col: str, clean=False):
"""
Adjust column value characters encoding to UTF-8 and uppercase them.
Args:
d (pandas DataFrame): Dataframe to lookup
col (str): String column
clean: remove specific characters
Returns:
pandas DataFrame
"""
d[col] = (
d[col]
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf-8")
)
d[col] = d[col].str.upper()
if clean is True:
d[col] = (
d[col]
.str.replace(".", "")
.str.replace("-", "")
.str.replace("/", "")
.str.replace(",", "")
.str.replace('"', "")
.str.split(" ")
.str.join("")
)
else:
d[col] = d[col]
return d
def get_state_uf():
df = get_pandas_df_once(
"brazil/metadata/UF.csv",
usecols=("CO_UF_IBGE", "CO_UF", "UF"),
sep=",",
dtype=str,
)
df = df.rename(
columns={"CO_UF_IBGE": "state.code", "CO_UF": "state.uf_number"},
errors="raise",
)
df = df.drop(columns="state.uf_number", errors="raise")
return df
def replace_state_uf_codes_with_names_and_trase_ids(df, col):
df_state_uf = get_state_uf()[[col, "state.code"]].drop_duplicates()
return full_merge(df, df_state_uf, on=col, how="left", validate="many_to_one")
def get_br_geocode(cnx=None):
"""
A lookup of a municipalities Geocode in Brazil.
:return:
df_2: dataframe, each row contains the commodity name and an array of product hs6 codes
"""
df = pd.read_sql(
"""SELECT SUBSTRING(trase_id, 4, 11) as "GEOCODE",
UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4, 2) name_id
FROM website.regions
WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
cnx.cnx,
)
return df
# ==================================================================================== #
# COMMON DICTIONARIES
# ==================================================================================== #
MUN_UF_RENAMES = {
"CARIRI - 17": "CARIRI DO TOCANTINS - 17",
"VITORIA DO SANTO ANTAO - 26": "VITORIA DE SANTO ANTAO - 26",
"SAO PAULO (JAGUARE) - 35": "SAO PAULO - 35",
"SUAPE - 26": "IPOJUCA - 26",
"DOURADOS - 51": "DOURADOS - 50",
"CARAPO - 50": "CAARAPO - 50",
}
# ==================================================================================== #
# REFINING FACILITIES
# (1) From manually downloaded recent files (xls format): extract useful data
# (2) Fetch GeoCode
# ==================================================================================== #
# STEP 1: Extract useful data from original XLS
def refining_load_data():
# Adjust new data - 2017
data_2018 = "brazil/logistics/abiove/ori/21122018-114526-pesquisa_de_capacidade_instalada_2018.xlsx"
df_ref_2017 = read_xlsx(
data_2018,
sheet_name="3. Unidades Industriais",
usecols="B:E,P",
header=7,
index=False,
skipfooter=6,
)
REF_17_COLS = {
"Empresas": "COMPANY",
"Município": "MUNICIPALITY",
"2017.1": "STATUS",
}
df_ref_2017 = rename(df_ref_2017, REF_17_COLS)
df_ref_2017 = normalize_str(df_ref_2017, "MUNICIPALITY")
df_ref_2017["YEAR"] = 2017
df_ref_2017 = df_ref_2017[["YEAR", "COMPANY", "UF", "MUNICIPALITY", "STATUS"]]
REF_18_COLS = {
"Empresas": "COMPANY",
"Município": "MUNICIPALITY",
"2018.1": "STATUS",
}
# Adjust new data - 2018
df_ref_2018 = read_xlsx(
data_2018,
sheet_name="3. Unidades Industriais",
usecols="B:E,O",
header=7,
index=False,
skipfooter=6,
)
df_ref_2018 = rename(df_ref_2018, REF_18_COLS)
df_ref_2018 = normalize_str(df_ref_2018, "MUNICIPALITY")
df_ref_2018["YEAR"] = 2018
df_ref_2018 = df_ref_2018[["YEAR", "COMPANY", "UF", "MUNICIPALITY", "STATUS"]]
# Adjust new data - 2019
df_ref_2019 = read_xlsx(
"brazil/logistics/abiove/ori/pesquisa-de-Capacidade-Instalada_2019.xlsx",
sheet_name="5.Unidades de Refino e Envase",
usecols="B:P",
header=7,
index=False,
skipfooter=6,
)
REF_2019_COLS = {"Empresas": "COMPANY", "Município": "MUNICIPALITY", 2019: "STATUS"}
df_ref_2019 = rename(df_ref_2019, REF_2019_COLS)
df_ref_2019 = normalize_str(df_ref_2019, "MUNICIPALITY")
df_ref_2019["YEAR"] = 2019
# Concatenate last years (2017-2019)
df_last_yrs = pd.concat([df_ref_2017, df_ref_2018, df_ref_2019], sort=False)
df_last_yrs = df_last_yrs.dropna(subset=["UF"])
return df_last_yrs
# STEP 2: Fetch GeoCode
def refining_insert_geocode(df: pd.DataFrame):
data = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
data["MUN_UF"] = data["MUNICIPALITY"] + " - " + data["state.code"]
data = map_values(data, "MUN_UF", **MUN_UF_RENAMES)
data = full_merge(
data,
get_br_geocode(CNX),
how="left",
left_on="MUN_UF",
right_on="name_id",
validate="many_to_one",
)
data = data[["YEAR", "COMPANY", "UF", "MUNICIPALITY", "GEOCODE", "STATUS"]]
# Merge with the previous version data (2003-2016)
df_previous_version = get_pandas_df_once(
"brazil/logistics/abiove/old/REFINING_FACILITIES_2003_2016.csv"
)
df = concat([df_previous_version, data])
df = df.sort_values("YEAR")
df = df.astype({"YEAR": str})
df.loc[df["STATUS"].isna(), "STATUS"] = "-"
for col in df.columns:
assert not any(df[col].isna())
assert not any(df[df[col] == ""]) == 0
assert not any(df[df[col] == "NAN"]) == 0
return df
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source(
"trase-storage-raw", "21122018-114526-pesquisa_de_capacidade_instalada_2018"
)
dbt.source("trase-storage-raw", "uf")
dbt.source("trase-storage-raw", "refining_facilities_2003_2016")
dbt.source("trase-storage-raw", "pesquisa-de-capacidade-instalada_2019")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})