Skip to content

2025 03 25 Br Beef Logistics Map V5 New

s3://trase-storage/brazil/logistics/slaughterhouses/slaughterhouse_map_v5/2025-03-25-br_beef_logistics_map_v5_new.csv

Dbt path: trase_production.main_brazil.2025_03_25_br_beef_logistics_map_v5_new

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/logistics/slaughterhouses/_schema.yml

Model file link: trase/data_pipeline/models/brazil/logistics/slaughterhouses/2025_03_25_br_beef_logistics_map_v5_new.py

Calls script: trase/data/brazil/logistics/slaughterhouses/slaughterhouse_map_v5/2025_03_25_br_beef_logistics_map_v5_new.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, brazil, logistics, slaughterhouses


2025_03_25_br_beef_logistics_map_v5_new

Description

See the Slaughterhouse Map Data Pipeline (Brazil) documentation on GitHub


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.br_municipalities_2020_ibge_4326_simplified
  • model.trase_duckdb.brazil_2025_03_14_sif
  • model.trase_duckdb.brazil_2025_03_24_sisbi_all

Sources

  • ['trase-storage-raw', 'br_municipalities_2020_ibge_4326_simplified']
import stdnum.br.cnpj
import pandas as pd
from trase.tools.aws.aws_helpers import read_geojson
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload

from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.location_checker import (
    LocationChecker,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.facility_deduplicator import (
    FacilityDeduplicator,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.string_cleaner import (
    StringCleaner,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.manual_inspections import (
    ManualInspections,
)
from trase.data.brazil.logistics.slaughterhouses.slaughterhouse_map_v5.helpers.sif_dataset_preparer import (
    SifDatasetPreparer,
)

"""

Config Session

"""

PATH_SIF = (
    "brazil/logistics/sanitary_inspections/animal_products/sif/out/2025-03-14-SIF.csv"
)

PATH_SISBI = "brazil/logistics/sanitary_inspections/animal_products/sisbi/out/2025-03-24_SISBI_ALL.csv"

PATH_BR_MUN_BOUNDARIES = "brazil/spatial/boundaries/ibge/2020/br_municipalities_2020_ibge_4326_simplified.geojson"

PATH_OUTPUT = "brazil/logistics/slaughterhouses/slaughterhouse_map_v5/2025-03-25-br_beef_logistics_map_v5_new.csv"

COLUMNS_TO_CHECK_FOR_DUPLICATES = [
    "COMPANY",
    "CNPJ",
    "INSPECTION_NUM",
    "GEOCODE",
    "ADDRESS",
    "INSPECTION_LEVEL",
    "STATE",
    "LEVEL",
    "STATUS",
    "MUNICIPALITY",
    "LONG",
    "LAT",
    "RESOLUTION",
    "CATEGORIES",
    "COMMODITY",
    "SOURCE",
    "TYPE",
    "UNI_ID",
]

COLUMNS_TO_FILL_WITH_UNKNOWN = [
    "COMPANY",
    "CNPJ",
    "STATE",
    "MUNICIPALITY",
    "ADDRESS",
    "TYPE_DETAILED_ENGLISH",
    "TYPE_DETAILED_PORTUGUESE",
]

COLUMNS_TO_EXPORT = [
    "company",
    "cnpj",
    "inspection_num",
    "geocode",
    "address",
    "inspection_level",
    "state",
    "level",
    "status",
    "municipality",
    "long",
    "lat",
    "resolution",
    "commodity",
    "source",
    "type_english",
    "type_portuguese",
    "type_detailed_english",
    "type_detailed_portuguese",
    "capacity_description_portuguese",
    "capacity_description_english",
    "capacity_value",
    "facility_id",
    "unique_id",
]

UNKNOWN_TEXT = "Unknown"


class CNPJValidator:
    @staticmethod
    def validate(code: str) -> str:
        return code if stdnum.br.cnpj.is_valid(code) else UNKNOWN_TEXT


class FacilityCleanerPipeline:
    def __init__(self, sif, sisbi, br_boundaries_mun):
        self.sif = sif.copy()
        self.sisbi = sisbi.copy()
        self.br_boundaries_mun = br_boundaries_mun.copy()

    def run(self):
        self.sif["CNPJ"] = self.sif["CNPJ"].apply(CNPJValidator.validate)
        self.sisbi["CNPJ"] = self.sisbi["CNPJ"].apply(CNPJValidator.validate)

        # Prepare SIF dataset to be concatenated
        sif_dataset_preparer = SifDatasetPreparer()
        self.sif = sif_dataset_preparer.prepare_sif_dataset(self.sif, UNKNOWN_TEXT)

        # Drop duplicates based on a few columns
        self.sif = self.sif[COLUMNS_TO_CHECK_FOR_DUPLICATES].drop_duplicates()
        self.sisbi = self.sisbi[COLUMNS_TO_CHECK_FOR_DUPLICATES].drop_duplicates()

        # Concatenate both sif and sisbi datasets
        dataset = pd.concat([self.sif, self.sisbi])

        # Discard records where uni_id has NA_
        dataset = dataset.query('~UNI_ID.str.contains("NA_")')

        # Merge with spatial data and enrich with municipality and state info
        location_checker = LocationChecker(self.br_boundaries_mun)
        dataset = location_checker.enrich_with_location_data(dataset)

        # Do some string cleaning
        string_cleaner = StringCleaner(UNKNOWN_TEXT)
        dataset = string_cleaner.clean_basic(dataset)
        dataset["TYPE"] = string_cleaner.deduplicate_similar_names(
            dataset["TYPE"], threshold=0.95
        )
        dataset["ADDRESS"] = string_cleaner.deduplicate_similar_names(
            dataset["ADDRESS"], threshold=0.8
        )
        dataset["INSPECTION_NUM"] = string_cleaner.extract_numeric_values(
            dataset["INSPECTION_NUM"]
        )
        dataset["COMPANY"] = string_cleaner.clean_company_name(dataset["COMPANY"])
        dataset = string_cleaner.get_full_state_name(dataset)

        # Extract data from strings, do manual adjustments and translations
        manual_inspections = ManualInspections()
        dataset = manual_inspections.get_full_facility_type_names(dataset)
        dataset = manual_inspections.get_facility_type_names_in_english(dataset)
        dataset = manual_inspections.split_categories(dataset)
        dataset = manual_inspections.get_split_categories_in_english(dataset)
        dataset = manual_inspections.get_status_in_english(dataset)
        dataset = manual_inspections.capitalize_columns(dataset)
        dataset = manual_inspections.cleanup_numeric_columns(dataset)

        # Fill with unknown text
        dataset = string_cleaner.fill_with_unknown_text(
            dataset,
            COLUMNS_TO_FILL_WITH_UNKNOWN,
        )

        # Deduplicate based on CNPJ and uni_id
        facility_deduplicator = FacilityDeduplicator(UNKNOWN_TEXT)
        dataset = facility_deduplicator.deduplicate_by_cnpj(dataset)
        dataset = facility_deduplicator.deduplicate_by_uni_id(dataset)

        # uni_id is actually not unique, there are multiple for the same facility if it handles multiple commoditites
        dataset["FACILITY_ID"] = dataset["UNI_ID"]
        # Create an actual unique_id
        dataset["UNIQUE_ID"] = dataset["UNI_ID"] + "_" + dataset["COMMODITY"]
        # Check if unique_id is really unique
        assert len(dataset) == dataset["UNIQUE_ID"].nunique()
        dataset = dataset.drop(columns=["UNI_ID"])

        # Convert columns to lower case
        dataset.columns = dataset.columns.str.lower()
        # Select and order columns
        dataset = dataset[COLUMNS_TO_EXPORT]

        return dataset


def main():
    sif = get_pandas_df_once(PATH_SIF, sep=";", dtype=str, keep_default_na=False)
    sisbi = get_pandas_df_once(PATH_SISBI, sep=";", dtype=str, keep_default_na=False)
    br_boundaries_mun = read_geojson(PATH_BR_MUN_BOUNDARIES, bucket="trase-storage")

    pipeline = FacilityCleanerPipeline(sif, sisbi, br_boundaries_mun)
    dataset = pipeline.run()

    write_csv_for_upload(dataset, PATH_OUTPUT, sep=",")


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "br_municipalities_2020_ibge_4326_simplified")
    dbt.ref("brazil_2025_03_14_sif")
    dbt.ref("brazil_2025_03_24_sisbi_all")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})