2025 03 25 Br Beef Logistics Map V5 New
s3://trase-storage/brazil/logistics/slaughterhouses/slaughterhouse_map_v5/2025-03-25-br_beef_logistics_map_v5_new.csv
Dbt path: trase_production.main_brazil.2025_03_25_br_beef_logistics_map_v5_new
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/logistics/slaughterhouses/_schema.yml
Model file link: trase/data_pipeline/models/brazil/logistics/slaughterhouses/2025_03_25_br_beef_logistics_map_v5_new.py
Calls script: trase/data/brazil/logistics/slaughterhouses/slaughterhouse_map_v5/2025_03_25_br_beef_logistics_map_v5_new.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, logistics, slaughterhouses
2025_03_25_br_beef_logistics_map_v5_new
Description
See the Slaughterhouse Map Data Pipeline (Brazil) documentation on GitHub
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.br_municipalities_2020_ibge_4326_simplifiedmodel.trase_duckdb.brazil_2025_03_14_sifmodel.trase_duckdb.brazil_2025_03_24_sisbi_all
Sources
['trase-storage-raw', 'br_municipalities_2020_ibge_4326_simplified']
import stdnum.br.cnpj
import pandas as pd
from trase.tools.aws.aws_helpers import read_geojson
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.location_checker import (
LocationChecker,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.facility_deduplicator import (
FacilityDeduplicator,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.string_cleaner import (
StringCleaner,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.manual_inspections import (
ManualInspections,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.sif_dataset_preparer import (
SifDatasetPreparer,
)
"""
Config Session
"""
PATH_SIF = (
"brazil/logistics/sanitary_inspections/animal_products/sif/out/2025-03-14-SIF.csv"
)
PATH_SISBI = "brazil/logistics/sanitary_inspections/animal_products/sisbi/out/2025-03-24_SISBI_ALL.csv"
PATH_BR_MUN_BOUNDARIES = "brazil/spatial/boundaries/ibge/2020/br_municipalities_2020_ibge_4326_simplified.geojson"
PATH_OUTPUT = "brazil/logistics/slaughterhouses/slaughterhouse_map_v5/2025-03-25-br_beef_logistics_map_v5_new.csv"
COLUMNS_TO_CHECK_FOR_DUPLICATES = [
"COMPANY",
"CNPJ",
"INSPECTION_NUM",
"GEOCODE",
"ADDRESS",
"INSPECTION_LEVEL",
"STATE",
"LEVEL",
"STATUS",
"MUNICIPALITY",
"LONG",
"LAT",
"RESOLUTION",
"CATEGORIES",
"COMMODITY",
"SOURCE",
"TYPE",
"UNI_ID",
]
COLUMNS_TO_FILL_WITH_UNKNOWN = [
"COMPANY",
"CNPJ",
"STATE",
"MUNICIPALITY",
"ADDRESS",
"TYPE_DETAILED_ENGLISH",
"TYPE_DETAILED_PORTUGUESE",
]
COLUMNS_TO_EXPORT = [
"company",
"cnpj",
"inspection_num",
"geocode",
"address",
"inspection_level",
"state",
"level",
"status",
"municipality",
"long",
"lat",
"resolution",
"commodity",
"source",
"type_english",
"type_portuguese",
"type_detailed_english",
"type_detailed_portuguese",
"capacity_description_portuguese",
"capacity_description_english",
"capacity_value",
"facility_id",
"unique_id",
]
UNKNOWN_TEXT = "Unknown"
class CNPJValidator:
@staticmethod
def validate(code: str) -> str:
return code if stdnum.br.cnpj.is_valid(code) else UNKNOWN_TEXT
class FacilityCleanerPipeline:
def __init__(self, sif, sisbi, br_boundaries_mun):
self.sif = sif.copy()
self.sisbi = sisbi.copy()
self.br_boundaries_mun = br_boundaries_mun.copy()
def run(self):
self.sif["CNPJ"] = self.sif["CNPJ"].apply(CNPJValidator.validate)
self.sisbi["CNPJ"] = self.sisbi["CNPJ"].apply(CNPJValidator.validate)
# Prepare SIF dataset to be concatenated
sif_dataset_preparer = SifDatasetPreparer()
self.sif = sif_dataset_preparer.prepare_sif_dataset(self.sif, UNKNOWN_TEXT)
# Drop duplicates based on a few columns
self.sif = self.sif[COLUMNS_TO_CHECK_FOR_DUPLICATES].drop_duplicates()
self.sisbi = self.sisbi[COLUMNS_TO_CHECK_FOR_DUPLICATES].drop_duplicates()
# Concatenate both sif and sisbi datasets
dataset = pd.concat([self.sif, self.sisbi])
# Discard records where uni_id has NA_
dataset = dataset.query('~UNI_ID.str.contains("NA_")')
# Merge with spatial data and enrich with municipality and state info
location_checker = LocationChecker(self.br_boundaries_mun)
dataset = location_checker.enrich_with_location_data(dataset)
# Do some string cleaning
string_cleaner = StringCleaner(UNKNOWN_TEXT)
dataset = string_cleaner.clean_basic(dataset)
dataset["TYPE"] = string_cleaner.deduplicate_similar_names(
dataset["TYPE"], threshold=0.95
)
dataset["ADDRESS"] = string_cleaner.deduplicate_similar_names(
dataset["ADDRESS"], threshold=0.8
)
dataset["INSPECTION_NUM"] = string_cleaner.extract_numeric_values(
dataset["INSPECTION_NUM"]
)
dataset["COMPANY"] = string_cleaner.clean_company_name(dataset["COMPANY"])
dataset = string_cleaner.get_full_state_name(dataset)
# Extract data from strings, do manual adjustments and translations
manual_inspections = ManualInspections()
dataset = manual_inspections.get_full_facility_type_names(dataset)
dataset = manual_inspections.get_facility_type_names_in_english(dataset)
dataset = manual_inspections.split_categories(dataset)
dataset = manual_inspections.get_split_categories_in_english(dataset)
dataset = manual_inspections.get_status_in_english(dataset)
dataset = manual_inspections.capitalize_columns(dataset)
dataset = manual_inspections.cleanup_numeric_columns(dataset)
# Fill with unknown text
dataset = string_cleaner.fill_with_unknown_text(
dataset,
COLUMNS_TO_FILL_WITH_UNKNOWN,
)
# Deduplicate based on CNPJ and uni_id
facility_deduplicator = FacilityDeduplicator(UNKNOWN_TEXT)
dataset = facility_deduplicator.deduplicate_by_cnpj(dataset)
dataset = facility_deduplicator.deduplicate_by_uni_id(dataset)
# uni_id is actually not unique, there are multiple for the same facility if it handles multiple commoditites
dataset["FACILITY_ID"] = dataset["UNI_ID"]
# Create an actual unique_id
dataset["UNIQUE_ID"] = dataset["UNI_ID"] + "_" + dataset["COMMODITY"]
# Check if unique_id is really unique
assert len(dataset) == dataset["UNIQUE_ID"].nunique()
dataset = dataset.drop(columns=["UNI_ID"])
# Convert columns to lower case
dataset.columns = dataset.columns.str.lower()
# Select and order columns
dataset = dataset[COLUMNS_TO_EXPORT]
return dataset
def main():
sif = get_pandas_df_once(PATH_SIF, sep=";", dtype=str, keep_default_na=False)
sisbi = get_pandas_df_once(PATH_SISBI, sep=";", dtype=str, keep_default_na=False)
br_boundaries_mun = read_geojson(PATH_BR_MUN_BOUNDARIES, bucket="trase-storage")
pipeline = FacilityCleanerPipeline(sif, sisbi, br_boundaries_mun)
dataset = pipeline.run()
write_csv_for_upload(dataset, PATH_OUTPUT, sep=",")
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "br_municipalities_2020_ibge_4326_simplified")
dbt.ref("brazil_2025_03_14_sif")
dbt.ref("brazil_2025_03_24_sisbi_all")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})