Br Rtrs 2019 2020
s3://trase-storage/brazil/soy/indicators/in/certification_rtrs/BR_RTRS_2019_2020.csv
Dbt path: trase_production.main_brazil.br_rtrs_2019_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/soy/indicators/in/certification_rtrs/_schema.yml
Model file link: trase/data_pipeline/models/brazil/soy/indicators/in/certification_rtrs/br_rtrs_2019_2020.py
Calls script: trase/data/brazil/soy/indicators/certification_rtrs_2016_2020.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, certification_rtrs, in, indicators, soy
br_rtrs_2019_2020
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/soy/indicators/certification_rtrs_2019_2020.py [permalink]. It was last run by Tomas Carvalho.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.coordenadas_fazendas_rtrs_2020source.trase_duckdb.trase-storage-raw.br_municipalities_2017source.trase_duckdb.trase-storage-raw.lista_fazendas2019_rtrs_coordenadas
Sources
['trase-storage-raw', 'coordenadas_fazendas_rtrs_2020']['trase-storage-raw', 'br_municipalities_2017']['trase-storage-raw', 'lista_fazendas2019_rtrs_coordenadas']
"""
BR-Soy Indicator - Actors - Certification RTRS
1. Extract from original excel file (2019-2020) and previous dataset (2016-2018)
2. Enrich it:
A. Clean column names
B. Add TRASE_ID
4. Merge with 2019-2020
5. Load into S3
"""
import unicodedata
from unidecode import unidecode
import numpy as np
import pandas as pd
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.utilities.helpers import clean_string
CITY_NAMES = {
"SORRISO (PRIMAVERINHA)": "SORRISO",
"SORRISO (BARREIRO)": "SORRISO",
"CANÁPOLIS - MG": "CANÁPOLIS",
"COSTA RICA - MS": "COSTA RICA",
"TANGARÁ DA SERRA - MT": "TANGARÁ DA SERRA",
"RIACHÃO - MA": "RIACHÃO",
"TASSO FRAGOSO - MA": "TASSO FRAGOSO",
"CAMPOS LINDOS - TO": "CAMPOS LINDOS",
"AGUA FRIA DE GOIÁS/GO": "AGUA FRIA DE GOIÁS",
"LUIZ EDUARDO MAGALHÃES": "LUIS EDUARDO MAGALHÃES",
"ALTO DA BOA VISTA": "ALTO BOA VISTA",
"ALTO DA BOA VISTA - MT": "ALTO BOA VISTA",
"CAMPO DE JÚLIO": "CAMPOS DE JULIO",
"GAMELEIRA": "GAMELEIRA DE GOIAS",
"SANTA CARMEN": "SANTA CARMEM",
"DIAMANTE DE OESTE": "DIAMANTE DOESTE",
"SANTANA DO ARAGUIA": "SANTANA DO ARAGUAIA",
"SÃO VALÉRIO DA NATIVIDADE": "SAO VALERIO",
"MONTE ALEGRE": "MONTE ALEGRE DO PIAUI",
}
def extract():
df_19 = get_pandas_df_once(
"brazil/soy/indicators/ori/certification_rtrs/Lista_Fazendas2019_RTRS_coordenadas.csv",
sep=",",
)
df_20 = get_pandas_df_once(
"brazil/soy/indicators/ori/certification_rtrs/Coordenadas_Fazendas_RTRS_2020.xlsx",
xlsx=True,
)
mun = get_pandas_df_once(
"brazil/spatial/BOUNDARIES/ibge/2017/out/br_municipalities_2017.csv"
)
production = get_pandas_df_once(
"brazil/soy/indicators/out/q2_2022/soy_production.csv", dtype=str
)
production = production.astype({"YIELD": float})
production = production[production["YEAR"].isin(["2019", "2020"])]
df_16_18 = get_pandas_df_once("brazil/soy/indicators/out/RTRS_tonnes_2016_2018.csv")
return df_19, df_20, mun, production, df_16_18
def enrich(df_19, df_20, mun, production):
df_19 = df_19.rename(
columns={
"Farm / Producer": "farm",
"Producer": "producer",
"City": "city",
"Estado": "state",
"Ha total": "total_area_ha",
"Ha plantado GM": "area_ha_gmo",
"Ha plantado não GM": "area_ha_non_gmo",
"Longitude decimal": "longitude",
"Latitude decimal": "latitude",
"Bioma": "biome",
}
)
df_20 = df_20.rename(
columns={
" Nome da Fazenda": "farm",
"Nome do Produtor": "producer",
"Cidade": "city",
"Estado": "state",
"Ha total da Fazenda": "total_area_ha",
"Ha plantado GM Soja": "area_ha_gmo",
"Ha plantado não GM Soja": "area_ha_non_gmo",
"Longitude decimal": "longitude",
"Latitude decimal": "latitude",
"Bioma": "biome",
}
)
def add_year_order_columns(df: pd.DataFrame, year: str):
df = df.assign(YEAR=lambda x: year)
return df[
[
"YEAR",
"farm",
"producer",
"city",
"state",
"biome",
"total_area_ha",
"area_ha_gmo",
"area_ha_non_gmo",
"longitude",
"latitude",
]
]
df_19 = add_year_order_columns(df_19, "2019")
df_20 = add_year_order_columns(df_20, "2020")
df_19_20 = pd.concat([df_19, df_20], sort=False)
df_19_20["producer"] = df_19_20["producer"].str.upper()
df_19_20["farm"] = df_19_20["farm"].str.upper()
df_19_20["city"] = df_19_20["city"].str.upper()
df_19_20["biome"] = df_19_20["biome"].str.upper()
df_19_20.loc[df_19_20["producer"] == "-", "producer"] = np.nan
df_19_20["city"] = df_19_20["city"].apply(lambda city: CITY_NAMES.get(city, city))
# Change state to match with IBGE states
df_19_20.loc[
(df_19_20["city"].str.contains("TASSO FRAGOSO")) & (df_19_20["state"] == "BA"),
"state",
] = "MA"
df_19_20.loc[
(df_19_20["city"].str.contains("VILHENA")) & (df_19_20["state"] == "MT"),
"state",
] = "RO"
df_19_20.loc[
(df_19_20["city"].str.contains("TASSO FRAGOSO")) & (df_19_20["state"] == "BA"),
"state",
] = "MA"
df_19_20 = df_19_20.assign(
MUNGEO=lambda x: (x["city"] + " - " + x["state"]).apply(clean_string)
)
df_19_20 = df_19_20.merge(mun, how="left", on="MUNGEO")
assert any(df_19_20[df_19_20["TRASE_ID"].isna()])
area = {
"\xa05026,14": "5026.14",
"\xa0978,12": "978.12",
"\xa03175,39": "3175.39",
"\xa03172,90": "3172.90",
"5.781,64\xa0": "5781.64",
"5.781,64": "5781.64",
}
df_19_20["total_area_ha"] = df_19_20["total_area_ha"].apply(
lambda x: area.get(x, x)
)
df_19_20 = df_19_20.astype({"total_area_ha": "float"})
"""Due to lack of production information in the 2019-2020 dataset,
we estimate the certified-RTRS production based on the municipality yield"""
df_19_20 = (
df_19_20.groupby(["TRASE_ID", "YEAR"], as_index=False).agg(
{"total_area_ha": "sum"}
)
).merge(production, how="left", on=["TRASE_ID", "YEAR"])
df_19_20 = df_19_20.assign(TONNES=lambda x: (x["YIELD"] * x["total_area_ha"]))
return df_19_20[["TRASE_ID", "YEAR", "TONNES"]]
def main():
df_19, df_20, mun, production, df_16_18 = extract()
df_19_20 = enrich(df_19, df_20, mun, production)
df_16_20 = pd.concat([df_16_18, df_19_20], sort=False)
write_csv_for_upload(
df_16_20, "brazil/soy/indicators/out/q2_2022/BR_RTRS_2016_2020.csv"
)
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "coordenadas_fazendas_rtrs_2020")
dbt.source("trase-storage-raw", "br_municipalities_2017")
dbt.source("trase-storage-raw", "lista_fazendas2019_rtrs_coordenadas")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})