Skip to content

Refining Facilities 2003 2019

s3://trase-storage/brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.csv

Dbt path: trase_production.main_brazil.refining_facilities_2003_2019

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/logistics/abiove/out/_schema.yml

Model file link: trase/data_pipeline/models/brazil/logistics/abiove/out/refining_facilities_2003_2019.py

Calls script: trase/data/brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, abiove, brazil, logistics, out


refining_facilities_2003_2019

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.21122018-114526-pesquisa_de_capacidade_instalada_2018
  • source.trase_duckdb.trase-storage-raw.uf
  • source.trase_duckdb.trase-storage-raw.refining_facilities_2003_2016
  • source.trase_duckdb.trase-storage-raw.pesquisa-de-capacidade-instalada_2019

Sources

  • ['trase-storage-raw', '21122018-114526-pesquisa_de_capacidade_instalada_2018']
  • ['trase-storage-raw', 'uf']
  • ['trase-storage-raw', 'refining_facilities_2003_2016']
  • ['trase-storage-raw', 'pesquisa-de-capacidade-instalada_2019']
"""
Brazil - Crushing and Refineries Facilities

ABIOVE (Associação Brasileira das Indústrias de Óleos Vegetais)

"""

from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_xlsx
from trase.tools.sei_pcs.pandas_utilities import *


def main():
    df = refining_load_data()
    df = refining_insert_geocode(df)
    write_csv_for_upload(
        df, "brazil/logistics/abiove/out/REFINING_FACILITIES_2003_2019.csv"
    )


# ==================================================================================== #
#   DECLARE GENERAL FUNCTIONS
# ==================================================================================== #


def map_values(df: pd.DataFrame, col: str, method="equal", cond_result=None, **kwargs):
    """
    Changes specific values from a given column based on a **kwargs.
    Method can be equal, contains or conditional, equal is default.
    Functions allows up to 3 conditions to be satisfied
    ::return::
    pandas.Series
    """

    if method == "equal":
        for key, value in kwargs.items():
            df.loc[df[col] == key, col] = value
    elif method == "contains":
        for key, value in kwargs.items():
            df.loc[df[col].str.contains(key), col] = value
    elif method == "conditional":
        df.loc[
            (
                (df[list(kwargs.keys())[0]].isin([list(kwargs.values())[0]]))
                & (df[list(kwargs.keys())[1]].isin([list(kwargs.values())[1]]))
                & (df[list(kwargs.keys())[2]].isin(list(kwargs.values())[2]))
            ),
            col,
        ] = cond_result
    else:
        raise ValueError(
            "The chosen method must be 'equal', 'contains' or 'conditional'."
        )

    return df


def normalize_str(d: pd.DataFrame, col: str, clean=False):
    """
    Adjust column value characters encoding to UTF-8 and uppercase them.

    Args:
        d (pandas DataFrame): Dataframe to lookup
        col (str): String column
        clean: remove specific characters

    Returns:
        pandas DataFrame
    """
    d[col] = (
        d[col]
        .str.normalize("NFKD")
        .str.encode("ascii", errors="ignore")
        .str.decode("utf-8")
    )

    d[col] = d[col].str.upper()

    if clean is True:
        d[col] = (
            d[col]
            .str.replace(".", "")
            .str.replace("-", "")
            .str.replace("/", "")
            .str.replace(",", "")
            .str.replace('"', "")
            .str.split(" ")
            .str.join("")
        )

    else:
        d[col] = d[col]

    return d


def get_state_uf():
    df = get_pandas_df_once(
        "brazil/metadata/UF.csv",
        usecols=("CO_UF_IBGE", "CO_UF", "UF"),
        sep=",",
        dtype=str,
    )
    df = df.rename(
        columns={"CO_UF_IBGE": "state.code", "CO_UF": "state.uf_number"},
        errors="raise",
    )
    df = df.drop(columns="state.uf_number", errors="raise")
    return df


def replace_state_uf_codes_with_names_and_trase_ids(df, col):
    df_state_uf = get_state_uf()[[col, "state.code"]].drop_duplicates()
    return full_merge(df, df_state_uf, on=col, how="left", validate="many_to_one")


def get_br_geocode(cnx=None):
    """
    A lookup of a municipalities Geocode in Brazil.
    :return:
    df_2: dataframe, each row contains the commodity name and an array of product hs6 codes
    """
    df = pd.read_sql(
        """SELECT SUBSTRING(trase_id, 4, 11) as "GEOCODE",
                UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4, 2) name_id
           FROM website.regions
           WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
        cnx.cnx,
    )
    return df


# ==================================================================================== #
#   COMMON DICTIONARIES
# ==================================================================================== #

MUN_UF_RENAMES = {
    "CARIRI - 17": "CARIRI DO TOCANTINS - 17",
    "VITORIA DO SANTO ANTAO - 26": "VITORIA DE SANTO ANTAO - 26",
    "SAO PAULO (JAGUARE) - 35": "SAO PAULO - 35",
    "SUAPE - 26": "IPOJUCA - 26",
    "DOURADOS - 51": "DOURADOS - 50",
    "CARAPO - 50": "CAARAPO - 50",
}


# ==================================================================================== #
#   REFINING FACILITIES
#   (1) From manually downloaded recent files (xls format): extract useful data
#   (2) Fetch GeoCode
# ==================================================================================== #


# STEP 1: Extract useful data from original XLS
def refining_load_data():
    # Adjust new data - 2017
    data_2018 = "brazil/logistics/abiove/ori/21122018-114526-pesquisa_de_capacidade_instalada_2018.xlsx"
    df_ref_2017 = read_xlsx(
        data_2018,
        sheet_name="3. Unidades Industriais",
        usecols="B:E,P",
        header=7,
        index=False,
        skipfooter=6,
    )

    REF_17_COLS = {
        "Empresas": "COMPANY",
        "Município": "MUNICIPALITY",
        "2017.1": "STATUS",
    }

    df_ref_2017 = rename(df_ref_2017, REF_17_COLS)
    df_ref_2017 = normalize_str(df_ref_2017, "MUNICIPALITY")
    df_ref_2017["YEAR"] = 2017
    df_ref_2017 = df_ref_2017[["YEAR", "COMPANY", "UF", "MUNICIPALITY", "STATUS"]]

    REF_18_COLS = {
        "Empresas": "COMPANY",
        "Município": "MUNICIPALITY",
        "2018.1": "STATUS",
    }

    # Adjust new data - 2018
    df_ref_2018 = read_xlsx(
        data_2018,
        sheet_name="3. Unidades Industriais",
        usecols="B:E,O",
        header=7,
        index=False,
        skipfooter=6,
    )

    df_ref_2018 = rename(df_ref_2018, REF_18_COLS)
    df_ref_2018 = normalize_str(df_ref_2018, "MUNICIPALITY")
    df_ref_2018["YEAR"] = 2018
    df_ref_2018 = df_ref_2018[["YEAR", "COMPANY", "UF", "MUNICIPALITY", "STATUS"]]

    # Adjust new data - 2019
    df_ref_2019 = read_xlsx(
        "brazil/logistics/abiove/ori/pesquisa-de-Capacidade-Instalada_2019.xlsx",
        sheet_name="5.Unidades de Refino e Envase",
        usecols="B:P",
        header=7,
        index=False,
        skipfooter=6,
    )
    REF_2019_COLS = {"Empresas": "COMPANY", "Município": "MUNICIPALITY", 2019: "STATUS"}

    df_ref_2019 = rename(df_ref_2019, REF_2019_COLS)
    df_ref_2019 = normalize_str(df_ref_2019, "MUNICIPALITY")
    df_ref_2019["YEAR"] = 2019

    # Concatenate last years (2017-2019)
    df_last_yrs = pd.concat([df_ref_2017, df_ref_2018, df_ref_2019], sort=False)
    df_last_yrs = df_last_yrs.dropna(subset=["UF"])

    return df_last_yrs


# STEP 2: Fetch GeoCode
def refining_insert_geocode(df: pd.DataFrame):
    data = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
    data["MUN_UF"] = data["MUNICIPALITY"] + " - " + data["state.code"]

    data = map_values(data, "MUN_UF", **MUN_UF_RENAMES)

    data = full_merge(
        data,
        get_br_geocode(CNX),
        how="left",
        left_on="MUN_UF",
        right_on="name_id",
        validate="many_to_one",
    )
    data = data[["YEAR", "COMPANY", "UF", "MUNICIPALITY", "GEOCODE", "STATUS"]]

    # Merge with the previous version data (2003-2016)
    df_previous_version = get_pandas_df_once(
        "brazil/logistics/abiove/old/REFINING_FACILITIES_2003_2016.csv"
    )
    df = concat([df_previous_version, data])
    df = df.sort_values("YEAR")
    df = df.astype({"YEAR": str})
    df.loc[df["STATUS"].isna(), "STATUS"] = "-"

    for col in df.columns:
        assert not any(df[col].isna())
        assert not any(df[df[col] == ""]) == 0
        assert not any(df[df[col] == "NAN"]) == 0

    return df


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source(
        "trase-storage-raw", "21122018-114526-pesquisa_de_capacidade_instalada_2018"
    )
    dbt.source("trase-storage-raw", "uf")
    dbt.source("trase-storage-raw", "refining_facilities_2003_2016")
    dbt.source("trase-storage-raw", "pesquisa-de-capacidade-instalada_2019")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})