DBT: Crushing Facilities 2003 2019
File location: s3://trase-storage/brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.csv
DBT model name: crushing_facilities_2003_2019
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.crushing_facilities_2003_2019 -
Containing yaml link: trase/data_pipeline/models/brazil/logistics/abiove/out/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/logistics/abiove/out/crushing_facilities_2003_2019.py
-
Calls script:
trase/data/brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.py -
Tags:
mock_model,abiove,brazil,logistics,out
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.hinrichsen_2007_2017source.trase_duckdb.trase-storage-raw.ufsource.trase_duckdb.trase-storage-raw.crushing_facilities_2003_2016source.trase_duckdb.trase-storage-raw.br_crushingfacilities_missingcapacity_17_19source.trase_duckdb.trase-storage-raw.pesquisa-de-capacidade-instalada_2019source.trase_duckdb.trase-storage-raw.aux_br_abiove_crushingfacilities_missing_cnpj_latlongsource.trase_duckdb.trase-storage-raw.21122018-114526-pesquisa_de_capacidade_instalada_2018source.trase_duckdb.trase-storage-raw.br_crushingfacilities_2018_2019
Sources
['trase-storage-raw', 'hinrichsen_2007_2017']['trase-storage-raw', 'uf']['trase-storage-raw', 'crushing_facilities_2003_2016']['trase-storage-raw', 'br_crushingfacilities_missingcapacity_17_19']['trase-storage-raw', 'pesquisa-de-capacidade-instalada_2019']['trase-storage-raw', 'aux_br_abiove_crushingfacilities_missing_cnpj_latlong']['trase-storage-raw', '21122018-114526-pesquisa_de_capacidade_instalada_2018']['trase-storage-raw', 'br_crushingfacilities_2018_2019']
"""
Brazil - Crushing and Refineries Facilities
ABIOVE (Associação Brasileira das Indústrias de Óleos Vegetais)
"""
from tempfile import gettempdir
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_xlsx
from trase.tools.sei_pcs.pandas_utilities import *
def main():
df = crushing_load_data()
capacity = crush_get_capacity_data()
df = crush_insert_geocode(df, capacity)
df, df_old_dataset = crush_adjust_companies_names(df)
df = crush_merge_new_and_old_data(df, df_old_dataset)
df = crush_insert_cnpj_lat_long(df)
df_adjusted_capacity = crush_add_missing_capacity(df)
df = crush_concatenate_missing_capacity_into_full_dataset(df, df_adjusted_capacity)
write_csv_for_upload(
df, "brazil/logistics/abiove/out/CRUSHING_FACILITIES_2003_2019.csv"
)
# ==================================================================================== #
# DECLARE GENERAL FUNCTIONS
# ==================================================================================== #
def map_values(df: pd.DataFrame, col: str, method="equal", cond_result=None, **kwargs):
"""
Changes specific values from a given column based on a **kwargs.
Method can be equal, contains or conditional, equal is default.
Functions allows up to 3 conditions to be satisfied
::return::
pandas.Series
"""
if method == "equal":
for key, value in kwargs.items():
df.loc[df[col] == key, col] = value
elif method == "contains":
for key, value in kwargs.items():
df.loc[df[col].str.contains(key), col] = value
elif method == "conditional":
df.loc[
(
(df[list(kwargs.keys())[0]].isin([list(kwargs.values())[0]]))
& (df[list(kwargs.keys())[1]].isin([list(kwargs.values())[1]]))
& (df[list(kwargs.keys())[2]].isin(list(kwargs.values())[2]))
),
col,
] = cond_result
else:
raise ValueError(
"The chosen method must be 'equal', 'contains' or 'conditional'."
)
return df
def normalize_str(d: pd.DataFrame, col: str, clean=False):
"""
Adjust column value characters encoding to UTF-8 and uppercase them.
Args:
d (pandas DataFrame): Dataframe to lookup
col (str): String column
clean: remove specific characters
Returns:
pandas DataFrame
"""
d[col] = (
d[col]
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf-8")
)
d[col] = d[col].str.upper()
if clean is True:
d[col] = (
d[col]
.str.replace(".", "")
.str.replace("-", "")
.str.replace("/", "")
.str.replace(",", "")
.str.replace('"', "")
.str.split(" ")
.str.join("")
)
else:
d[col] = d[col]
return d
def get_state_uf():
df = get_pandas_df_once(
"brazil/metadata/UF.csv",
usecols=("CO_UF_IBGE", "CO_UF", "UF"),
sep=",",
dtype=str,
)
df = df.rename(
columns={"CO_UF_IBGE": "state.code", "CO_UF": "state.uf_number"},
errors="raise",
)
df = df.drop(columns="state.uf_number", errors="raise")
return df
def replace_state_uf_codes_with_names_and_trase_ids(df, col):
df_state_uf = get_state_uf()[[col, "state.code"]].drop_duplicates()
return full_merge(df, df_state_uf, on=col, how="left", validate="many_to_one")
def get_br_geocode(cnx=None):
"""
A lookup of a municipalities Geocode in Brazil.
:return:
df_2: dataframe, each row contains the commodity name and an array of product hs6 codes
"""
df = pd.read_sql(
"""SELECT SUBSTRING(trase_id, 4, 11) as "GEOCODE",
UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4, 2) name_id
FROM website.regions
WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
cnx.cnx,
)
return df
# ==================================================================================== #
# COMMON DICTIONARIES
# ==================================================================================== #
MUN_UF_RENAMES = {
"CARIRI - 17": "CARIRI DO TOCANTINS - 17",
"VITORIA DO SANTO ANTAO - 26": "VITORIA DE SANTO ANTAO - 26",
"SAO PAULO (JAGUARE) - 35": "SAO PAULO - 35",
"SUAPE - 26": "IPOJUCA - 26",
"DOURADOS - 51": "DOURADOS - 50",
"CARAPO - 50": "CAARAPO - 50",
}
ABIOVE_COMPANY_NAMES = {
"AGREX": "AGREX DO BRASIL S/A",
"CLW ALIMENTOS": "CLW / HELMUT TESMANN",
"3 TENTOS": "TRES TENTOS AGROINDUSTRIAL",
}
# ==================================================================================== #
# CRUSHING FACILITIES
#
# (1) From manually downloaded recent files (xls format): extract useful data
# (2) Get Capacity Data (from JJ Hinrichsen)
# (3) Fetch GeoCode
# (4) Adjust companies' names
# (5) Merge with current version (2003-2016)
# (6) Insert missing CNPJs and Latitude and Longitude based on merge of csv from S3
# (7) Identify Missing Capacity
# (7.5) Run other script to adjust missing capacity
# (8) Concatenate current version with the new ones
# (9) Export final csv locally and sent it to S3 Bucket using AWS CLI
# (10) QA missing CNPJ, CAPACITY and LAT/LONG
# ==================================================================================== #
def crushing_load_data():
"""STEP 1: Extract useful data from original XLS"""
crushing_columns = {"Empresas": "COMPANY", "Município": "MUNICIPALITY"}
# Adjust new data - 2017
df_2017 = read_xlsx(
"brazil/logistics/abiove/ori/21122018-114526-pesquisa_de_capacidade_instalada_2018.xlsx",
sheet_name="3. Unidades Industriais",
usecols="B:E,G",
header=7,
index=False,
skipfooter=7,
)
df_2017 = rename(df_2017, crushing_columns)
df_2017["YEAR"] = 2017
df_2017 = normalize_str(df_2017, "MUNICIPALITY")
df_2017 = df_2017[["YEAR", "COMPANY", "UF", "MUNICIPALITY"]]
# Adjust new data - 2018
df_2018 = read_xlsx(
"brazil/logistics/abiove/ori/21122018-114526-pesquisa_de_capacidade_instalada_2018.xlsx",
sheet_name="3. Unidades Industriais",
usecols="B:F",
header=7,
index=False,
skipfooter=7,
)
df_2018 = rename(df_2018, crushing_columns)
df_2018["YEAR"] = 2018
df_2018 = df_2018[["YEAR", "COMPANY", "UF", "MUNICIPALITY"]]
df_2018 = normalize_str(df_2018, "MUNICIPALITY")
# Adjust new data - 2019
df_2019 = read_xlsx(
"brazil/logistics/abiove/ori/pesquisa-de-Capacidade-Instalada_2019.xlsx",
sheet_name="3.Unidades de Processamento",
usecols="B:N",
header=7,
index=False,
skipfooter=6,
)
df_2019 = rename(df_2019, crushing_columns)
df_2019["YEAR"] = 2019
df_2019 = normalize_str(df_2019, "MUNICIPALITY")
df_2019 = df_2019[["YEAR", "COMPANY", "UF", "MUNICIPALITY"]]
# Concatenate last years (2017-2019)
df = concat([df_2017, df_2018, df_2019])
df = normalize_str(df, "COMPANY")
df = map_values(df, "COMPANY", method="contains", **ABIOVE_COMPANY_NAMES)
df = map_values(df, "MUNICIPALITY", kwargs={"CARAPO": "CAARAPO"})
return df
def crush_get_capacity_data():
"""STEP 2: Get Capacity Data"""
capacity_17 = get_pandas_df_once(
"brazil/logistics/crushing_facilities/in/hinrichsen/HINRICHSEN_2007_2017.csv",
sep=",",
)
capacity_18_19 = get_pandas_df_once(
"brazil/logistics/crushing_facilities/in/hinrichsen/br_crushingFacilities_2018_2019.csv",
sep=",",
)
capacity_17 = capacity_17[capacity_17["YEAR"] == 2017]
capacity = concat([capacity_17, capacity_18_19])
capacity_company_names = {
"BUNGE ALIMENTOS SA": "BUNGE",
"CARGILL AGRICOLA SA": "CARGILL",
"ADM DO BRASIL LTDA": "ADM",
"BRF SA": "BRF",
"AMAQGI": "AMAGGI",
"AMAQQI": "AMAGGI",
"ALIANCA AGRICOLA": "ALIANCA AGRICOLA DO CERRADO",
"TENTOS": "TRES TENTOS AGROINDUSTRIAL",
"CARAMURU": "CARAMURU",
"LDC": "LOUIS DREYFUS COMMODITIES",
"AGRARIA": "COOPER AGRARIA",
"AGRENCO": "AGRENCO",
"CLW": "CLW / HELMUT TESMANN",
"COMOVE": "SPERAFICO",
"COCAMAR": "COCAMAR",
"COOPERMIL": "COOPERMIL",
"COOPERATIVA LAR": "COOPERATIVA AGROINDUSTRIAL LAR",
"COPACOL": "COPACOL",
"CAMERA": "CAMERA",
"FRANGOS": "DIP FRANGOS (DIPLOMATA)",
"GRANOSUL": "GRANOSUL",
"GRANO SUL": "GRANOSUL",
"GRANOSULNIDERA": "GRANO SUL NIDERA",
"GRUPALCOOPERMIL": "GRUPAL COOPERMIL",
"PARECIS": "PARECIS SA",
"PALMEIRENSE": "APSA - ALGODOEIRA PALMEIRENSE",
"SELECTA": "SELECTA",
"SANTA ROSA": "SANTA ROSA",
"OVETRILDIPLOMATA": "SIPAL",
"SIPAL-DIPLOMATA S/A": "SIPAL",
}
capacity = map_values(
capacity, "COMPANY", method="contains", **capacity_company_names
)
capacity = normalize_str(capacity, "COMPANY")
capacity = normalize_str(capacity, "MUNICIPALITY")
capacity["CAPACITY"] = (
capacity["CAPACITY"].astype(str).replace("\.0", "", regex=True)
)
capacity["GEOCODE"] = capacity["GEOCODE"].astype(str).replace("\.0", "", regex=True)
capacity = capacity[capacity["STATUS"] == "ATIVA"]
return capacity
def crush_insert_geocode(df: pd.DataFrame, capacity: pd.DataFrame):
"""STEP 3: Fetch GeoCode"""
df = df.merge(
capacity,
how="left",
on=["YEAR", "COMPANY", "UF", "MUNICIPALITY"],
validate="many_to_many",
)
df = df.drop_duplicates()
df = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
df["MUN_UF"] = df["MUNICIPALITY"] + " - " + df["state.code"]
df = map_values(df, "MUN_UF", **MUN_UF_RENAMES)
df = full_merge(
df,
get_br_geocode(CNX),
how="left",
left_on="MUN_UF",
right_on="name_id",
validate="many_to_one",
)
df = rename(df, {"GEOCODE_y": "GEOCODE", "SOURCE": "CAPACITY_SOURCE"})
df["CF"] = ""
df["CNPJ"] = ""
df["LAT"] = ""
df["LONG"] = ""
df["RESOLUTION"] = ""
df = df[
[
"YEAR",
"COMPANY",
"UF",
"MUNICIPALITY",
"GEOCODE",
"CF",
"CAPACITY",
"CAPACITY_SOURCE",
"CNPJ",
"LAT",
"LONG",
"RESOLUTION",
]
]
df = df.drop_duplicates(subset=["YEAR", "COMPANY", "GEOCODE"])
df = map_values(df, "COMPANY", method="contains", **ABIOVE_COMPANY_NAMES)
df = normalize_str(df, "COMPANY")
return df
def crush_adjust_companies_names(df: pd.DataFrame):
"""STEP 4: Adjust Companies' names"""
df_old = get_pandas_df_once(
"brazil/logistics/abiove/old/CRUSHING_FACILITIES_2003_2016.csv",
sep=";",
encoding="utf-8",
keep_default_na=False,
)
old_data_company_name = {
"ALGODOEIRA PALMEIRENSE SOCIEDADE ANONIMA APSA": "APSA - ALGODOEIRA PALMEIRENSE",
"ALFA": "COOPERALFA",
"AGREX": "AGREX DO BRASIL S/A",
"COCAMAR": "COCAMAR",
"COOPAVEL": "COOPAVEL",
"COOPERATIVA AGROINDUSTRIAL DOS PRODUTORES RURAIS DO SUDOESTE": "COMIGO",
"LAR COOPERATIVA AGROINDUSTRIAL": "COOPERATIVA AGROINDUSTRIAL LAR",
"COOPERATIVA AGRARIA AGROINDUSTRIAL": "COOPER AGRARIA",
"COPACOL-COOPERATIVA AGROINDUSTRIAL CONSOLATA": "COPACOL",
"CLARION AGROINDUSTRIAL": "ROOT BRASIL (ARRENDATARIA DA CLARION)",
"DIPLOMATA": "DIP FRANGOS (DIPLOMATA)",
"LOUIS DREYFUS": "LOUIS DREYFUS COMMODITIES",
"SOCCEPAR": "SOCCEPAR",
}
df_crushing = map_values(
df_old, "COMPANY", method="contains", **old_data_company_name
)
df_crushing.loc[
(df_crushing["COMPANY"].str.contains("CLW"))
& (df_crushing["GEOCODE"] == "4303509"),
"COMPANY",
] = "CLW / HELMUT TESMANN"
df = df.astype({"GEOCODE": int})
return df, df_old
def crush_merge_new_and_old_data(df: pd.DataFrame, df_old: pd.DataFrame):
"""STEP 5: Merge with Current Version (2003-2016)"""
df = df.merge(df_old, how="left", on=["COMPANY", "GEOCODE", "UF", "MUNICIPALITY"])
crush_columns = {
"YEAR_x": "YEAR",
"CF_y": "CF",
"CAPACITY_x": "CAPACITY",
"CAPACITY_SOURCE_x": "CAPACITY_SOURCE",
"CNPJ_y": "CNPJ",
"LAT_y": "LAT",
"LONG_y": "LONG",
"RESOLUTION_y": "RESOLUTION",
}
df = rename(df, crush_columns)
df = df[
[
"YEAR",
"COMPANY",
"UF",
"MUNICIPALITY",
"GEOCODE",
"CF",
"CAPACITY",
"CAPACITY_SOURCE",
"CNPJ",
"LAT",
"LONG",
"RESOLUTION",
]
]
df = df.drop_duplicates()
df = concat([df, df_old])
return df
def crush_insert_cnpj_lat_long(df: pd.DataFrame):
"""STEP 6: Insert missing CNPJ and Latitude and Longitude"""
missing_lat_long = get_pandas_df_once(
"brazil/logistics/abiove/in/aux_br_abiove_crushingFacilities_missing_cnpj_latLong.csv",
sep=";",
keep_default_na=False,
dtype=str,
)
crush_with_latlong = df[
(~(df["LAT"].isna()) & (df["LONG"].isna()))
| ((df["LAT"] != "NA") & (df["LONG"] != "NA"))
]
df_crush_nan_cnpj_lat_long = df[
((df["LAT"].isna()) & (df["LONG"].isna()))
| ((df["LAT"] == "NA") & (df["LONG"] == "NA"))
]
df_crush_nan_cnpj_lat_long = df_crush_nan_cnpj_lat_long.astype({"GEOCODE": str})
df_latlong = df_crush_nan_cnpj_lat_long.merge(
missing_lat_long,
how="left",
left_on=["COMPANY", "GEOCODE"],
right_on=["company", "geocode"],
validate="many_to_one",
)
df_latlong = df_latlong[
[
"YEAR",
"COMPANY",
"UF",
"MUNICIPALITY",
"GEOCODE",
"CF",
"CAPACITY",
"CAPACITY_SOURCE",
"cnpj",
"lat",
"long",
"RESOLUTION",
]
]
df_latlong["RESOLUTION"] = "POINT"
df_latlong = rename(df_latlong, {"cnpj": "CNPJ", "lat": "LAT", "long": "LONG"})
df_latlong = df_latlong.astype({"YEAR": str})
df_crush_with_latlong = crush_with_latlong.astype({"YEAR": str, "GEOCODE": str})
# Concatenate current version with the new one
crush_facilities = concat([df_crush_with_latlong, df_latlong])
crush_facilities = crush_facilities.sort_values(
["YEAR", "COMPANY", "UF", "GEOCODE"]
)
# Adjust data
crush_facilities.loc[crush_facilities["CAPACITY"].isin(["NA", "nan"])] = np.nan
crush_facilities["CAPACITY"] = crush_facilities["CAPACITY"].fillna(0)
crush_facilities = crush_facilities[~(crush_facilities["COMPANY"].isna())]
crushing_facilities = crush_facilities.drop_duplicates(
["YEAR", "COMPANY", "GEOCODE", "CF"], keep="last"
)
return crushing_facilities
def crush_add_missing_capacity(df: pd.DataFrame):
"""STEP 7: Add missing capacity"""
missing = df[
["YEAR", "COMPANY", "UF", "MUNICIPALITY", "CNPJ", "CAPACITY_SOURCE", "GEOCODE"]
][df["CAPACITY"] == 0]
missing = missing.merge(
df[df["YEAR"].isin(["2014", "2015", "2016", "2017", "2018", "2019"])],
how="left",
on=["COMPANY", "MUNICIPALITY", "UF", "CNPJ"],
)
missing["CAPACITY"] = missing["CAPACITY"].astype(float).astype(int).round(0)
missing = rename(
missing,
{
"YEAR_y": "YEAR",
"CAPACITY_SOURCE_x": "CAPACITY_SOURCE",
"GEOCODE_x": "GEOCODE",
},
)
missing = missing[
[
"YEAR_x",
"COMPANY",
"MUNICIPALITY",
"YEAR",
"CAPACITY",
"CAPACITY_SOURCE",
"GEOCODE",
]
]
# Export missing data to local file to be used in the auxiliary script
# "br_crushingFacilities_missingCapacity_17_19.py"
path = Path(gettempdir()) / "br_crushingFacilities_missingCapacity_17_19.csv"
missing.to_csv(str(path), sep=";", encoding="utf-8", index=False)
# Execute the auxiliary script to fulfil missing capacities
# os.system("python br_crushingFacilities_missingCapacities_17_19.py")
# Import adjusted crushing capacity
df_adjusted_crushing_capacity = get_pandas_df_once(
"brazil/logistics/abiove/in/br_crushingFacilities_missingCapacity_17_19.csv",
sep=";",
keep_default_na=False,
dtype=str,
)
return df_adjusted_crushing_capacity
def crush_concatenate_missing_capacity_into_full_dataset(
df: pd.DataFrame, df_adjusted_crushing_capacity: pd.DataFrame
):
"""STEP 8: Concatenate missing capacity into full dataset"""
df["CAPACITY"] = df["CAPACITY"].astype(float).astype(int).astype(str)
df["GEOCODE"] = df["GEOCODE"].astype(str)
df = df.merge(
df_adjusted_crushing_capacity, how="left", on=["COMPANY", "GEOCODE", "YEAR"]
)
df.loc[(~df["CAPACITY_y"].isna()), "CAPACITY_x"] = df["CAPACITY_y"]
df.loc[
(df["CAPACITY_SOURCE_y"] == "INTERPOLATED - NO INFORMATION"),
"CAPACITY_SOURCE_x",
] = df["CAPACITY_SOURCE_y"]
df = rename(df, {"CAPACITY_x": "CAPACITY", "CAPACITY_SOURCE_x": "CAPACITY_SOURCE"})
df.loc[(df["LAT"].isna()), "RESOLUTION"] = "NA"
df["CAPACITY_SOURCE"] = df["CAPACITY_SOURCE"].replace(np.nan, "NA")
df["CNPJ"] = df["CNPJ"].replace(np.nan, "NA")
df["CF"] = df["CF"].replace(np.nan, "NA")
df["LAT"] = df["LAT"].replace(np.nan, "NA")
df["LONG"] = df["LONG"].replace(np.nan, "NA")
df = df[
[
"YEAR",
"COMPANY",
"UF",
"MUNICIPALITY",
"GEOCODE",
"CF",
"CAPACITY",
"CAPACITY_SOURCE",
"CNPJ",
"LAT",
"LONG",
"RESOLUTION",
]
]
df = df.sort_values(["YEAR", "COMPANY"])
return df
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "hinrichsen_2007_2017")
dbt.source("trase-storage-raw", "uf")
dbt.source("trase-storage-raw", "crushing_facilities_2003_2016")
dbt.source("trase-storage-raw", "br_crushingfacilities_missingcapacity_17_19")
dbt.source("trase-storage-raw", "pesquisa-de-capacidade-instalada_2019")
dbt.source(
"trase-storage-raw", "aux_br_abiove_crushingfacilities_missing_cnpj_latlong"
)
dbt.source(
"trase-storage-raw", "21122018-114526-pesquisa_de_capacidade_instalada_2018"
)
dbt.source("trase-storage-raw", "br_crushingfacilities_2018_2019")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})