Skip to content

DBT: Crushing Facilities 2003 2019

File location: s3://trase-storage/brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.csv

DBT model name: crushing_facilities_2003_2019

Explore on Metabase: Full table; summary statistics

DBT details


Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.hinrichsen_2007_2017
  • source.trase_duckdb.trase-storage-raw.uf
  • source.trase_duckdb.trase-storage-raw.crushing_facilities_2003_2016
  • source.trase_duckdb.trase-storage-raw.br_crushingfacilities_missingcapacity_17_19
  • source.trase_duckdb.trase-storage-raw.pesquisa-de-capacidade-instalada_2019
  • source.trase_duckdb.trase-storage-raw.aux_br_abiove_crushingfacilities_missing_cnpj_latlong
  • source.trase_duckdb.trase-storage-raw.21122018-114526-pesquisa_de_capacidade_instalada_2018
  • source.trase_duckdb.trase-storage-raw.br_crushingfacilities_2018_2019

Sources

  • ['trase-storage-raw', 'hinrichsen_2007_2017']
  • ['trase-storage-raw', 'uf']
  • ['trase-storage-raw', 'crushing_facilities_2003_2016']
  • ['trase-storage-raw', 'br_crushingfacilities_missingcapacity_17_19']
  • ['trase-storage-raw', 'pesquisa-de-capacidade-instalada_2019']
  • ['trase-storage-raw', 'aux_br_abiove_crushingfacilities_missing_cnpj_latlong']
  • ['trase-storage-raw', '21122018-114526-pesquisa_de_capacidade_instalada_2018']
  • ['trase-storage-raw', 'br_crushingfacilities_2018_2019']
"""
Brazil - Crushing and Refineries Facilities

ABIOVE (Associação Brasileira das Indústrias de Óleos Vegetais)

"""

from tempfile import gettempdir

from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_xlsx
from trase.tools.sei_pcs.pandas_utilities import *


def main():
    df = crushing_load_data()
    capacity = crush_get_capacity_data()
    df = crush_insert_geocode(df, capacity)
    df, df_old_dataset = crush_adjust_companies_names(df)
    df = crush_merge_new_and_old_data(df, df_old_dataset)
    df = crush_insert_cnpj_lat_long(df)
    df_adjusted_capacity = crush_add_missing_capacity(df)
    df = crush_concatenate_missing_capacity_into_full_dataset(df, df_adjusted_capacity)
    write_csv_for_upload(
        df, "brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.csv"
    )


# ==================================================================================== #
#   DECLARE GENERAL FUNCTIONS
# ==================================================================================== #


def map_values(df: pd.DataFrame, col: str, method="equal", cond_result=None, **kwargs):
    """
    Changes specific values from a given column based on a **kwargs.
    Method can be equal, contains or conditional, equal is default.
    Functions allows up to 3 conditions to be satisfied
    ::return::
    pandas.Series
    """

    if method == "equal":
        for key, value in kwargs.items():
            df.loc[df[col] == key, col] = value
    elif method == "contains":
        for key, value in kwargs.items():
            df.loc[df[col].str.contains(key), col] = value
    elif method == "conditional":
        df.loc[
            (
                (df[list(kwargs.keys())[0]].isin([list(kwargs.values())[0]]))
                & (df[list(kwargs.keys())[1]].isin([list(kwargs.values())[1]]))
                & (df[list(kwargs.keys())[2]].isin(list(kwargs.values())[2]))
            ),
            col,
        ] = cond_result
    else:
        raise ValueError(
            "The chosen method must be 'equal', 'contains' or 'conditional'."
        )

    return df


def normalize_str(d: pd.DataFrame, col: str, clean=False):
    """
    Adjust column value characters encoding to UTF-8 and uppercase them.

    Args:
        d (pandas DataFrame): Dataframe to lookup
        col (str): String column
        clean: remove specific characters

    Returns:
        pandas DataFrame
    """
    d[col] = (
        d[col]
        .str.normalize("NFKD")
        .str.encode("ascii", errors="ignore")
        .str.decode("utf-8")
    )

    d[col] = d[col].str.upper()

    if clean is True:
        d[col] = (
            d[col]
            .str.replace(".", "")
            .str.replace("-", "")
            .str.replace("/", "")
            .str.replace(",", "")
            .str.replace('"', "")
            .str.split(" ")
            .str.join("")
        )

    else:
        d[col] = d[col]

    return d


def get_state_uf():
    df = get_pandas_df_once(
        "brazil/metadata/UF.csv",
        usecols=("CO_UF_IBGE", "CO_UF", "UF"),
        sep=",",
        dtype=str,
    )
    df = df.rename(
        columns={"CO_UF_IBGE": "state.code", "CO_UF": "state.uf_number"},
        errors="raise",
    )
    df = df.drop(columns="state.uf_number", errors="raise")
    return df


def replace_state_uf_codes_with_names_and_trase_ids(df, col):
    df_state_uf = get_state_uf()[[col, "state.code"]].drop_duplicates()
    return full_merge(df, df_state_uf, on=col, how="left", validate="many_to_one")


def get_br_geocode(cnx=None):
    """
    A lookup of a municipalities Geocode in Brazil.
    :return:
    df_2: dataframe, each row contains the commodity name and an array of product hs6 codes
    """
    df = pd.read_sql(
        """SELECT SUBSTRING(trase_id, 4, 11) as "GEOCODE",
                UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4, 2) name_id
           FROM website.regions
           WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
        cnx.cnx,
    )
    return df


# ==================================================================================== #
#   COMMON DICTIONARIES
# ==================================================================================== #

MUN_UF_RENAMES = {
    "CARIRI - 17": "CARIRI DO TOCANTINS - 17",
    "VITORIA DO SANTO ANTAO - 26": "VITORIA DE SANTO ANTAO - 26",
    "SAO PAULO (JAGUARE) - 35": "SAO PAULO - 35",
    "SUAPE - 26": "IPOJUCA - 26",
    "DOURADOS - 51": "DOURADOS - 50",
    "CARAPO - 50": "CAARAPO - 50",
}

ABIOVE_COMPANY_NAMES = {
    "AGREX": "AGREX DO BRASIL S/A",
    "CLW ALIMENTOS": "CLW / HELMUT TESMANN",
    "3 TENTOS": "TRES TENTOS AGROINDUSTRIAL",
}


# ==================================================================================== #
#   CRUSHING FACILITIES
#
#   (1) From manually downloaded recent files (xls format): extract useful data
#   (2) Get Capacity Data (from JJ Hinrichsen)
#   (3) Fetch GeoCode
#   (4) Adjust companies' names
#   (5) Merge with current version (2003-2016)
#   (6) Insert missing CNPJs and Latitude and Longitude based on merge of csv from S3
#   (7) Identify Missing Capacity
#       (7.5) Run other script to adjust missing capacity
#   (8) Concatenate current version with the new ones
#   (9) Export final csv locally and sent it to S3 Bucket using AWS CLI
#   (10) QA missing CNPJ, CAPACITY and LAT/LONG
# ==================================================================================== #


def crushing_load_data():
    """STEP 1: Extract useful data from original XLS"""

    crushing_columns = {"Empresas": "COMPANY", "Município": "MUNICIPALITY"}

    # Adjust new data - 2017
    df_2017 = read_xlsx(
        "brazil/logistics/abiove/ori/21122018-114526-pesquisa_de_capacidade_instalada_2018.xlsx",
        sheet_name="3. Unidades Industriais",
        usecols="B:E,G",
        header=7,
        index=False,
        skipfooter=7,
    )

    df_2017 = rename(df_2017, crushing_columns)
    df_2017["YEAR"] = 2017
    df_2017 = normalize_str(df_2017, "MUNICIPALITY")
    df_2017 = df_2017[["YEAR", "COMPANY", "UF", "MUNICIPALITY"]]

    # Adjust new data - 2018
    df_2018 = read_xlsx(
        "brazil/logistics/abiove/ori/21122018-114526-pesquisa_de_capacidade_instalada_2018.xlsx",
        sheet_name="3. Unidades Industriais",
        usecols="B:F",
        header=7,
        index=False,
        skipfooter=7,
    )

    df_2018 = rename(df_2018, crushing_columns)
    df_2018["YEAR"] = 2018
    df_2018 = df_2018[["YEAR", "COMPANY", "UF", "MUNICIPALITY"]]
    df_2018 = normalize_str(df_2018, "MUNICIPALITY")

    # Adjust new data - 2019
    df_2019 = read_xlsx(
        "brazil/logistics/abiove/ori/pesquisa-de-Capacidade-Instalada_2019.xlsx",
        sheet_name="3.Unidades de Processamento",
        usecols="B:N",
        header=7,
        index=False,
        skipfooter=6,
    )

    df_2019 = rename(df_2019, crushing_columns)
    df_2019["YEAR"] = 2019
    df_2019 = normalize_str(df_2019, "MUNICIPALITY")
    df_2019 = df_2019[["YEAR", "COMPANY", "UF", "MUNICIPALITY"]]

    # Concatenate last years (2017-2019)
    df = concat([df_2017, df_2018, df_2019])
    df = normalize_str(df, "COMPANY")
    df = map_values(df, "COMPANY", method="contains", **ABIOVE_COMPANY_NAMES)
    df = map_values(df, "MUNICIPALITY", kwargs={"CARAPO": "CAARAPO"})

    return df


def crush_get_capacity_data():
    """STEP 2: Get Capacity Data"""

    capacity_17 = get_pandas_df_once(
        "brazil/logistics/crushing_facilities/in/hinrichsen/HINRICHSEN_2007_2017.csv",
        sep=",",
    )
    capacity_18_19 = get_pandas_df_once(
        "brazil/logistics/crushing_facilities/in/hinrichsen/br_crushingFacilities_2018_2019.csv",
        sep=",",
    )

    capacity_17 = capacity_17[capacity_17["YEAR"] == 2017]
    capacity = concat([capacity_17, capacity_18_19])

    capacity_company_names = {
        "BUNGE ALIMENTOS SA": "BUNGE",
        "CARGILL AGRICOLA SA": "CARGILL",
        "ADM DO BRASIL LTDA": "ADM",
        "BRF SA": "BRF",
        "AMAQGI": "AMAGGI",
        "AMAQQI": "AMAGGI",
        "ALIANCA AGRICOLA": "ALIANCA AGRICOLA DO CERRADO",
        "TENTOS": "TRES TENTOS AGROINDUSTRIAL",
        "CARAMURU": "CARAMURU",
        "LDC": "LOUIS DREYFUS COMMODITIES",
        "AGRARIA": "COOPER AGRARIA",
        "AGRENCO": "AGRENCO",
        "CLW": "CLW / HELMUT TESMANN",
        "COMOVE": "SPERAFICO",
        "COCAMAR": "COCAMAR",
        "COOPERMIL": "COOPERMIL",
        "COOPERATIVA LAR": "COOPERATIVA AGROINDUSTRIAL LAR",
        "COPACOL": "COPACOL",
        "CAMERA": "CAMERA",
        "FRANGOS": "DIP FRANGOS (DIPLOMATA)",
        "GRANOSUL": "GRANOSUL",
        "GRANO SUL": "GRANOSUL",
        "GRANOSULNIDERA": "GRANO SUL NIDERA",
        "GRUPALCOOPERMIL": "GRUPAL COOPERMIL",
        "PARECIS": "PARECIS SA",
        "PALMEIRENSE": "APSA - ALGODOEIRA PALMEIRENSE",
        "SELECTA": "SELECTA",
        "SANTA ROSA": "SANTA ROSA",
        "OVETRILDIPLOMATA": "SIPAL",
        "SIPAL-DIPLOMATA S/A": "SIPAL",
    }
    capacity = map_values(
        capacity, "COMPANY", method="contains", **capacity_company_names
    )
    capacity = normalize_str(capacity, "COMPANY")
    capacity = normalize_str(capacity, "MUNICIPALITY")
    capacity["CAPACITY"] = (
        capacity["CAPACITY"].astype(str).replace("\.0", "", regex=True)
    )
    capacity["GEOCODE"] = capacity["GEOCODE"].astype(str).replace("\.0", "", regex=True)
    capacity = capacity[capacity["STATUS"] == "ATIVA"]

    return capacity


def crush_insert_geocode(df: pd.DataFrame, capacity: pd.DataFrame):
    """STEP 3: Fetch GeoCode"""

    df = df.merge(
        capacity,
        how="left",
        on=["YEAR", "COMPANY", "UF", "MUNICIPALITY"],
        validate="many_to_many",
    )
    df = df.drop_duplicates()
    df = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
    df["MUN_UF"] = df["MUNICIPALITY"] + " - " + df["state.code"]

    df = map_values(df, "MUN_UF", **MUN_UF_RENAMES)

    df = full_merge(
        df,
        get_br_geocode(CNX),
        how="left",
        left_on="MUN_UF",
        right_on="name_id",
        validate="many_to_one",
    )
    df = rename(df, {"GEOCODE_y": "GEOCODE", "SOURCE": "CAPACITY_SOURCE"})

    df["CF"] = ""
    df["CNPJ"] = ""
    df["LAT"] = ""
    df["LONG"] = ""
    df["RESOLUTION"] = ""

    df = df[
        [
            "YEAR",
            "COMPANY",
            "UF",
            "MUNICIPALITY",
            "GEOCODE",
            "CF",
            "CAPACITY",
            "CAPACITY_SOURCE",
            "CNPJ",
            "LAT",
            "LONG",
            "RESOLUTION",
        ]
    ]
    df = df.drop_duplicates(subset=["YEAR", "COMPANY", "GEOCODE"])
    df = map_values(df, "COMPANY", method="contains", **ABIOVE_COMPANY_NAMES)
    df = normalize_str(df, "COMPANY")

    return df


def crush_adjust_companies_names(df: pd.DataFrame):
    """STEP 4: Adjust Companies' names"""

    df_old = get_pandas_df_once(
        "brazil/logistics/abiove/old/CRUSHING_FACILITIES_2003_2016.csv",
        sep=";",
        encoding="utf-8",
        keep_default_na=False,
    )

    old_data_company_name = {
        "ALGODOEIRA PALMEIRENSE SOCIEDADE ANONIMA APSA": "APSA - ALGODOEIRA PALMEIRENSE",
        "ALFA": "COOPERALFA",
        "AGREX": "AGREX DO BRASIL S/A",
        "COCAMAR": "COCAMAR",
        "COOPAVEL": "COOPAVEL",
        "COOPERATIVA AGROINDUSTRIAL DOS PRODUTORES RURAIS DO SUDOESTE": "COMIGO",
        "LAR COOPERATIVA AGROINDUSTRIAL": "COOPERATIVA AGROINDUSTRIAL LAR",
        "COOPERATIVA AGRARIA AGROINDUSTRIAL": "COOPER AGRARIA",
        "COPACOL-COOPERATIVA AGROINDUSTRIAL CONSOLATA": "COPACOL",
        "CLARION AGROINDUSTRIAL": "ROOT BRASIL (ARRENDATARIA DA CLARION)",
        "DIPLOMATA": "DIP FRANGOS (DIPLOMATA)",
        "LOUIS DREYFUS": "LOUIS DREYFUS COMMODITIES",
        "SOCCEPAR": "SOCCEPAR",
    }

    df_crushing = map_values(
        df_old, "COMPANY", method="contains", **old_data_company_name
    )

    df_crushing.loc[
        (df_crushing["COMPANY"].str.contains("CLW"))
        & (df_crushing["GEOCODE"] == "4303509"),
        "COMPANY",
    ] = "CLW / HELMUT TESMANN"

    df = df.astype({"GEOCODE": int})

    return df, df_old


def crush_merge_new_and_old_data(df: pd.DataFrame, df_old: pd.DataFrame):
    """STEP 5: Merge with Current Version (2003-2016)"""

    df = df.merge(df_old, how="left", on=["COMPANY", "GEOCODE", "UF", "MUNICIPALITY"])

    crush_columns = {
        "YEAR_x": "YEAR",
        "CF_y": "CF",
        "CAPACITY_x": "CAPACITY",
        "CAPACITY_SOURCE_x": "CAPACITY_SOURCE",
        "CNPJ_y": "CNPJ",
        "LAT_y": "LAT",
        "LONG_y": "LONG",
        "RESOLUTION_y": "RESOLUTION",
    }

    df = rename(df, crush_columns)
    df = df[
        [
            "YEAR",
            "COMPANY",
            "UF",
            "MUNICIPALITY",
            "GEOCODE",
            "CF",
            "CAPACITY",
            "CAPACITY_SOURCE",
            "CNPJ",
            "LAT",
            "LONG",
            "RESOLUTION",
        ]
    ]
    df = df.drop_duplicates()
    df = concat([df, df_old])

    return df


def crush_insert_cnpj_lat_long(df: pd.DataFrame):
    """STEP 6: Insert missing CNPJ and Latitude and Longitude"""

    missing_lat_long = get_pandas_df_once(
        "brazil/logistics/abiove/in/aux_br_abiove_crushingFacilities_missing_cnpj_latLong.csv",
        sep=";",
        keep_default_na=False,
        dtype=str,
    )

    crush_with_latlong = df[
        (~(df["LAT"].isna()) & (df["LONG"].isna()))
        | ((df["LAT"] != "NA") & (df["LONG"] != "NA"))
    ]
    df_crush_nan_cnpj_lat_long = df[
        ((df["LAT"].isna()) & (df["LONG"].isna()))
        | ((df["LAT"] == "NA") & (df["LONG"] == "NA"))
    ]

    df_crush_nan_cnpj_lat_long = df_crush_nan_cnpj_lat_long.astype({"GEOCODE": str})

    df_latlong = df_crush_nan_cnpj_lat_long.merge(
        missing_lat_long,
        how="left",
        left_on=["COMPANY", "GEOCODE"],
        right_on=["company", "geocode"],
        validate="many_to_one",
    )
    df_latlong = df_latlong[
        [
            "YEAR",
            "COMPANY",
            "UF",
            "MUNICIPALITY",
            "GEOCODE",
            "CF",
            "CAPACITY",
            "CAPACITY_SOURCE",
            "cnpj",
            "lat",
            "long",
            "RESOLUTION",
        ]
    ]

    df_latlong["RESOLUTION"] = "POINT"
    df_latlong = rename(df_latlong, {"cnpj": "CNPJ", "lat": "LAT", "long": "LONG"})
    df_latlong = df_latlong.astype({"YEAR": str})
    df_crush_with_latlong = crush_with_latlong.astype({"YEAR": str, "GEOCODE": str})

    # Concatenate current version with the new one
    crush_facilities = concat([df_crush_with_latlong, df_latlong])
    crush_facilities = crush_facilities.sort_values(
        ["YEAR", "COMPANY", "UF", "GEOCODE"]
    )

    # Adjust data
    crush_facilities.loc[crush_facilities["CAPACITY"].isin(["NA", "nan"])] = np.nan
    crush_facilities["CAPACITY"] = crush_facilities["CAPACITY"].fillna(0)
    crush_facilities = crush_facilities[~(crush_facilities["COMPANY"].isna())]
    crushing_facilities = crush_facilities.drop_duplicates(
        ["YEAR", "COMPANY", "GEOCODE", "CF"], keep="last"
    )

    return crushing_facilities


def crush_add_missing_capacity(df: pd.DataFrame):
    """STEP 7: Add missing capacity"""

    missing = df[
        ["YEAR", "COMPANY", "UF", "MUNICIPALITY", "CNPJ", "CAPACITY_SOURCE", "GEOCODE"]
    ][df["CAPACITY"] == 0]

    missing = missing.merge(
        df[df["YEAR"].isin(["2014", "2015", "2016", "2017", "2018", "2019"])],
        how="left",
        on=["COMPANY", "MUNICIPALITY", "UF", "CNPJ"],
    )

    missing["CAPACITY"] = missing["CAPACITY"].astype(float).astype(int).round(0)
    missing = rename(
        missing,
        {
            "YEAR_y": "YEAR",
            "CAPACITY_SOURCE_x": "CAPACITY_SOURCE",
            "GEOCODE_x": "GEOCODE",
        },
    )

    missing = missing[
        [
            "YEAR_x",
            "COMPANY",
            "MUNICIPALITY",
            "YEAR",
            "CAPACITY",
            "CAPACITY_SOURCE",
            "GEOCODE",
        ]
    ]

    # Export missing data to local file to be used in the auxiliary script
    # "br_crushingFacilities_missingCapacity_17_19.py"
    path = Path(gettempdir()) / "br_crushingFacilities_missingCapacity_17_19.csv"
    missing.to_csv(str(path), sep=";", encoding="utf-8", index=False)

    # Execute the auxiliary script to fulfil missing capacities
    # os.system("python br_crushingFacilities_missingCapacities_17_19.py")

    # Import adjusted crushing capacity
    df_adjusted_crushing_capacity = get_pandas_df_once(
        "brazil/logistics/abiove/in/br_crushingFacilities_missingCapacity_17_19.csv",
        sep=";",
        keep_default_na=False,
        dtype=str,
    )

    return df_adjusted_crushing_capacity


def crush_concatenate_missing_capacity_into_full_dataset(
    df: pd.DataFrame, df_adjusted_crushing_capacity: pd.DataFrame
):
    """STEP 8: Concatenate missing capacity into full dataset"""

    df["CAPACITY"] = df["CAPACITY"].astype(float).astype(int).astype(str)
    df["GEOCODE"] = df["GEOCODE"].astype(str)

    df = df.merge(
        df_adjusted_crushing_capacity, how="left", on=["COMPANY", "GEOCODE", "YEAR"]
    )

    df.loc[(~df["CAPACITY_y"].isna()), "CAPACITY_x"] = df["CAPACITY_y"]

    df.loc[
        (df["CAPACITY_SOURCE_y"] == "INTERPOLATED - NO INFORMATION"),
        "CAPACITY_SOURCE_x",
    ] = df["CAPACITY_SOURCE_y"]

    df = rename(df, {"CAPACITY_x": "CAPACITY", "CAPACITY_SOURCE_x": "CAPACITY_SOURCE"})

    df.loc[(df["LAT"].isna()), "RESOLUTION"] = "NA"
    df["CAPACITY_SOURCE"] = df["CAPACITY_SOURCE"].replace(np.nan, "NA")
    df["CNPJ"] = df["CNPJ"].replace(np.nan, "NA")
    df["CF"] = df["CF"].replace(np.nan, "NA")
    df["LAT"] = df["LAT"].replace(np.nan, "NA")
    df["LONG"] = df["LONG"].replace(np.nan, "NA")

    df = df[
        [
            "YEAR",
            "COMPANY",
            "UF",
            "MUNICIPALITY",
            "GEOCODE",
            "CF",
            "CAPACITY",
            "CAPACITY_SOURCE",
            "CNPJ",
            "LAT",
            "LONG",
            "RESOLUTION",
        ]
    ]

    df = df.sort_values(["YEAR", "COMPANY"])

    return df


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "hinrichsen_2007_2017")
    dbt.source("trase-storage-raw", "uf")
    dbt.source("trase-storage-raw", "crushing_facilities_2003_2016")
    dbt.source("trase-storage-raw", "br_crushingfacilities_missingcapacity_17_19")
    dbt.source("trase-storage-raw", "pesquisa-de-capacidade-instalada_2019")
    dbt.source(
        "trase-storage-raw", "aux_br_abiove_crushingfacilities_missing_cnpj_latlong"
    )
    dbt.source(
        "trase-storage-raw", "21122018-114526-pesquisa_de_capacidade_instalada_2018"
    )
    dbt.source("trase-storage-raw", "br_crushingfacilities_2018_2019")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})