Skip to content

Municipality Production 2014

s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/municipality_production_2014.csv

Dbt path: trase_production.main_brazil.municipality_production_2014

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/municipality_production_schema.yml

Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/municipality_production_2014.py

Calls script: trase/runbook/brazil/beef/indicators/embedding/a_embedding_quants.R

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, beef, brazil, sei_pcs, v2.2.1, indicators


municipality_production_2014

Description

No description


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.pasture_beef_deforestation_emissions_2010_2023_multilevel
  • source.trase_duckdb.trase-storage-raw.seipcs_brazil_beef_2014

Sources

  • ['trase-storage-raw', 'pasture_beef_deforestation_emissions_2010_2023_multilevel']
  • ['trase-storage-raw', 'seipcs_brazil_beef_2014']
# Load required libraries
library(tidyverse)
library(arrow)
library(aws.s3)
options(scipen = 9999)
library(glue)
library(readr)

# Authenticate AWS credentials
aws.signature::use_credentials()

# Configuration: file paths and parameters
# --------------------------------------------------------

# Paths to input and output files in S3
PATH_SPATIAL_METRICS = 'brazil/beef/indicators/gold/q3_2025/pasture_c10_beef_deforestation_emissions_2010_2023_q3_2025_multilevel.parquet'
PATH_SEIPCS_V221 = 'brazil/beef/sei_pcs/v2.2.1/SEIPCS_BRAZIL_BEEF_{year}.csv'
PATH_OUTPUT_MUN_STATISTICS <- 'brazil/beef/sei_pcs/v2.2.1/municipality_production_{year}.csv'
# PATH_OUTPUT = 'brazil/beef/sei_pcs/v2.2.1/post_embedding/quants_post_embedding_quants_{year}_c9.parquet'
PATH_OUTPUT = 'brazil/beef/sei_pcs/v2.2.1/post_embedding/quants_post_embedding_quants_{year}_v1.parquet'

# List of columns that define individual flows
FLOWS_COLS <- c(
    'STATE_OF_PRODUCTION',
    'MUNICIPALITY',
    'LOGISTICS_HUB',
    'LOGISTICS_HUB_TRASE_ID',
    'PORT_OF_EXPORT',
    'EXPORTER',
    'STATE_OF_EXPORTER',
    'IMPORTER',
    'COUNTRY',
    'VOLUME_RAW',
    'VOLUME_PRODUCT',
    'FOB',
    'HS4',
    'HS6',
    'YEAR',
    'EXPORTER_CNPJ',
    'BRANCH',
    'GEOCODE_SOURCE',
    'ZERO_DEFORESTATION_BRAZIL_BEEF'
)

# Spatial metrics normalized per ton (to be multiplied by volume)
EMBEDDING_SPATIAL_METRICS_PER_TON <- c(
    'PASTURE_HA_PER_TON',
    'CATTLE_DEFORESTATION_PER_TN_5_YEAR_TOTAL',
    'CATTLE_DEFORESTATION_PER_TN_5_YEAR_ANNUAL', 
    'PASTURE_DEFORESTATION_PER_TN_5_YEAR_TOTAL',
    'PASTURE_DEFORESTATION_PER_TN_5_YEAR_ANNUAL',
    'CO2_GROSS_EMISSIONS_CATTLE_DEFORESTATION_PER_TN_5_YEAR_TOTAL',
    'CO2_GROSS_EMISSIONS_CATTLE_DEFORESTATION_PER_TN_5_YEAR_ANNUAL',
    'CO2_NET_EMISSIONS_CATTLE_DEFORESTATION_PER_TN_5_YEAR_TOTAL',
    'CO2_NET_EMISSIONS_CATTLE_DEFORESTATION_PER_TN_5_YEAR_ANNUAL',
    'CO2_GROSS_EMISSIONS_PASTURE_DEFORESTATION_PER_TN_5_YEAR_TOTAL',
    'CO2_GROSS_EMISSIONS_PASTURE_DEFORESTATION_PER_TN_5_YEAR_ANNUAL',
    'CO2_NET_EMISSIONS_PASTURE_DEFORESTATION_PER_TN_5_YEAR_TOTAL',
    'CO2_NET_EMISSIONS_PASTURE_DEFORESTATION_PER_TN_5_YEAR_ANNUAL'
)

# Spatial metrics used for aggregation (not normalized per ton)
SPATIAL_METRICS_QUANTS <- gsub("_PER_TN", "", EMBEDDING_SPATIAL_METRICS_PER_TON)
SPATIAL_METRICS_QUANTS <- gsub("_PER_TON", "", SPATIAL_METRICS_QUANTS)

# Define which years use each SEI-PCS version
YEARS_V220 <- 2010:2020
YEARS_V221 <- 2021:2023
YEARS <- c(YEARS_V220, YEARS_V221)

# Load municipality-level spatial metrics from S3
# --------------------------------------------------------

quants_metrics <- s3read_using(
    FUN = read_parquet,
    sep = ";",
    header = TRUE,
    as.is = TRUE,
    object = PATH_SPATIAL_METRICS,
    opts = c("check_region" = TRUE),
    bucket = "trase-storage"
)

quants_metrics_munincipality <- quants_metrics %>% 
    as_tibble() %>%
    filter(level == "municipality")
# Helper functions
# --------------------------------------------------------

# Standardize municipality and state codes using TRASE ID pattern
fix_state_and_municipality_trase_ids <- function(df) {
    municipality <- df$MUNICIPALITY

    is_unknown_municipality <- municipality == "BR-XXXXXXX"
    is_known_municipality <- str_detect(municipality, "^BR-\\d{7}$")
    is_aggregated_municipality <- str_detect(municipality, "^BR-\\d{2}-AGGREGATED$")

    if (!all(is_unknown_municipality | is_known_municipality | is_aggregated_municipality)) {
        stop("The MUNICIPALITY column contains unexpected values.")
    }

    df <- df %>%
        mutate(StateCode = str_sub(MUNICIPALITY, 1, 5))

    df$MUNICIPALITY[is_aggregated_municipality] <- "BR-XXXXXXX"

    return(df)
}

# Filter production data for a specific year
get_year_production <- function(df, year){
    df %>% 
        filter(YEAR == !!year) %>%
        mutate(
            Prod = CW_PRODUCTION_TONS,
            StateCode = str_sub(TRASE_ID, 1, 5)
        )
}

# Aggregate volume by municipality
get_volume_by_municipality <- function(df, metric_name) {
    metric_sym <- rlang::sym(metric_name)
    df %>%
        group_by(YEAR, StateCode, MUNICIPALITY) %>%
        summarise(!!metric_sym := sum(VOLUME_RAW_T, na.rm = TRUE), .groups = "drop")
}

# Main embedding loop per year
# --------------------------------------------------------

for(year in YEARS) {

    cat(">>> Processing year:", year, "\n")

    # Load SEI-PCS flows
    sei_year <- s3read_using(
        FUN = read_delim,
        delim = ";",
        locale = locale(encoding = "UTF-8"),
        show_col_types = FALSE,
        object = glue(PATH_SEIPCS_V221),
        opts = c("check_region" = TRUE),
        bucket = "trase-storage"
    ) %>% mutate(VOLUME_RAW_T = VOLUME_RAW / 1000)

    # Ensure schema compatibility by adding missing columns
    if (!"ZERO_DEFORESTATION_BRAZIL_BEEF" %in% names(sei_year)) {
        sei_year <- sei_year %>% mutate(ZERO_DEFORESTATION_BRAZIL_BEEF = 'UNKNOWN')
    }

    # Fix TRASE ID structure and treat aggregated municipalities as unknown
    sei_year <- fix_state_and_municipality_trase_ids(sei_year)

    # Split flows into known and unknown municipalities
    known_flows <- sei_year %>% filter(MUNICIPALITY != "BR-XXXXXXX")
    unknown_flows <- sei_year %>% filter(MUNICIPALITY == "BR-XXXXXXX")

    # Step 1: Get production data by municipality/state
    production_municipality <- get_year_production(quants_metrics_munincipality, year)
    production_state <- production_municipality %>% 
        group_by(YEAR, StateCode) %>%
        summarise(Prod = sum(Prod, na.rm = TRUE), across(all_of(SPATIAL_METRICS_QUANTS), ~ sum(.x, na.rm = TRUE)))

    # Step 2: Get export volumes by municipality/state
    known_volume_mun_state <- get_volume_by_municipality(known_flows, 'KnownExport')
    unknown_volume_mun_state <- get_volume_by_municipality(unknown_flows, 'UnknownExport')

    # Step 3: Merge metrics and exports at municipality level
    municipality_metrics <- production_municipality %>%
        left_join(known_volume_mun_state, join_by(YEAR, TRASE_ID == MUNICIPALITY, StateCode))

    # Step 4: Separate partially and fully unknown exports
    unknown_export_state <- unknown_volume_mun_state %>%
        filter(StateCode != "BR-XX")

    fully_unknown_export <- unknown_volume_mun_state %>%
        filter(StateCode == "BR-XX")

    # Step 5: Calculate export shares by type (known, unknown, fully unknown)
    municipality_shares <- municipality_metrics %>%
        mutate(KnownExport = replace_na(KnownExport, 0)) %>%
        mutate(
            ExportKnownShare = pmin(KnownExport / Prod, 1),
            ExportRemaining = pmax(Prod - KnownExport, 0)
        ) 


    # Step 5.1: Calculate export shares by state
    state_shares <- production_state %>%
        left_join(
            known_volume_mun_state %>%  
                group_by(YEAR, StateCode) %>%
                summarise(KnownExport = sum(KnownExport, na.rm = TRUE)),
            join_by(YEAR, StateCode)
        ) %>%
        left_join(
            unknown_export_state, join_by(YEAR, StateCode)
        ) %>%
        mutate(UnknownExport = replace_na(UnknownExport, 0)) %>%
        mutate(ExportKnownShare = pmin(KnownExport / Prod, 1)) %>%
        mutate(
            ExportUnknownShare = pmin(UnknownExport / Prod, 1 - ExportKnownShare)
        )

    # Step 6: Calculate flow-level shares within each category
    known_flows <- known_flows %>%
        group_by(YEAR, MUNICIPALITY) %>%
        mutate(FlowKnownShare = VOLUME_RAW_T / sum(VOLUME_RAW_T, na.rm = TRUE)) %>%
        ungroup()

    flows_unknown_state_known <- unknown_flows %>%
        filter(StateCode != "BR-XX") %>%
        group_by(YEAR, StateCode) %>%
        mutate(FlowUnknownShare = VOLUME_RAW_T / sum(VOLUME_RAW_T, na.rm = TRUE)) %>%
        ungroup()

    flows_fully_unknown <- unknown_flows %>%
        filter(StateCode == "BR-XX") %>%
        mutate(FlowFullyUnknownShare = VOLUME_RAW_T / sum(VOLUME_RAW_T, na.rm = TRUE))

    # Step 7: Calculate total volumes and shares for diagnostics
    total_production <- sum(municipality_metrics$Prod, na.rm = TRUE)
    total_known_flows <- sum(known_flows$VOLUME_RAW_T, na.rm = TRUE)
    total_unknown_state_known <- sum(flows_unknown_state_known$VOLUME_RAW_T, na.rm = TRUE)
    total_flows_fully_unknown <- sum(flows_fully_unknown$VOLUME_RAW_T, na.rm = TRUE)
    total_domestic <- total_production - (total_known_flows + total_unknown_state_known + total_flows_fully_unknown)

    print(total_domestic)
    print(total_production)

    known_flows_perc <- total_known_flows / total_production
    unknown_state_known_perc <- total_unknown_state_known / total_production
    flows_fully_unknown_perc <- total_flows_fully_unknown / total_production

    # Step 8: Embed spatial metrics into known flows
    known_flows <- known_flows %>%
        left_join(
            municipality_shares %>% select(YEAR, TRASE_ID, all_of(SPATIAL_METRICS_QUANTS), ExportKnownShare),
            join_by(YEAR, MUNICIPALITY == TRASE_ID)
        ) %>%
        mutate(
            across(
                all_of(SPATIAL_METRICS_QUANTS),
                ~ .x * ExportKnownShare * FlowKnownShare
            )
        )

    # Step 9: Embed spatial metrics into unknown flows (state known)
    flows_unknown_state_known <- flows_unknown_state_known %>%
        left_join(
            state_shares %>% select(YEAR, StateCode, all_of(SPATIAL_METRICS_QUANTS), ExportUnknownShare),
            join_by(YEAR, StateCode)
        ) %>%
        mutate(
            across(
                all_of(SPATIAL_METRICS_QUANTS),
                ~ .x * ExportUnknownShare * FlowUnknownShare
            )
        )

    # Step 10: Embed spatial metrics into fully unknown flows
    # Compute available metrics to embed based on flow percentages
    total_by_metric <- municipality_shares %>%
        group_by(YEAR) %>%
        summarise(across(all_of(SPATIAL_METRICS_QUANTS), ~ sum(.x, na.rm = TRUE)), .groups = "drop")

    total_embedded_known <- known_flows %>% group_by(YEAR) %>%
        summarise(across(all_of(SPATIAL_METRICS_QUANTS), ~ sum(.x, na.rm = TRUE)), .groups = "drop")

    total_embedded_unknown <- flows_unknown_state_known %>% group_by(YEAR) %>%
        summarise(across(all_of(SPATIAL_METRICS_QUANTS), ~ sum(.x, na.rm = TRUE)), .groups = "drop")

    total_expected_fully_unknown <- total_by_metric %>%
        mutate(across(all_of(SPATIAL_METRICS_QUANTS), ~ .x * flows_fully_unknown_perc))

    # If total_embedded_known is empty, set it to 0s with same structure
    if (nrow(total_by_metric) == 0) {
        total_by_metric <- tibble(
            YEAR = year,
            !!!setNames(as.list(rep(0, length(SPATIAL_METRICS_QUANTS))), SPATIAL_METRICS_QUANTS)
        )
        total_embedded_known <- total_by_metric
        total_embedded_unknown <- total_by_metric
        total_expected_fully_unknown <- total_by_metric
    }

    # Calculate metrics available to fully unknown flows
    left_metrics <- total_by_metric
    left_metrics[SPATIAL_METRICS_QUANTS] <- pmax(
        left_metrics[SPATIAL_METRICS_QUANTS] - total_embedded_known[SPATIAL_METRICS_QUANTS],
        0
    )

    left_metrics[SPATIAL_METRICS_QUANTS] <- pmax(
        left_metrics[SPATIAL_METRICS_QUANTS] - total_embedded_unknown[SPATIAL_METRICS_QUANTS],
        0
    )

    metrics_fully_unknown_to_embed <- left_metrics
    metrics_fully_unknown_to_embed[SPATIAL_METRICS_QUANTS] <- Map(
        pmin,
        left_metrics[SPATIAL_METRICS_QUANTS],
        total_expected_fully_unknown[SPATIAL_METRICS_QUANTS]
    )

    flows_fully_unknown <- flows_fully_unknown %>%
        left_join(metrics_fully_unknown_to_embed, join_by(YEAR)) %>%
        mutate(
            across(
                all_of(SPATIAL_METRICS_QUANTS),
                ~ .x * FlowFullyUnknownShare
            )
        )

    total_embedded_fully_unknown <- flows_fully_unknown %>% group_by(YEAR) %>%
        summarise(across(all_of(SPATIAL_METRICS_QUANTS), sum, na.rm = TRUE))

    # Step 11: Embed spatial metrics into domestic consumption flows
    domestic_embedded_metrics <- pmax(
        left_metrics[SPATIAL_METRICS_QUANTS] - total_embedded_fully_unknown[SPATIAL_METRICS_QUANTS],
        0
    )

    domestic_embedded_flows <- domestic_embedded_metrics %>%
        mutate(
            STATE_OF_PRODUCTION = "UNKNOWN STATE",
            MUNICIPALITY = "BR-XXXXXXX",
            LOGISTICS_HUB = "BR-BEEF-SLAUGHTERHOUSE-UNKNOWN",
            LOGISTICS_HUB_TRASE_ID = "BR-XXXXXXX",
            PORT_OF_EXPORT = "DOMESTIC PROCESSING AND CONSUMPTION",
            EXPORTER = "DOMESTIC PROCESSING AND CONSUMPTION",
            STATE_OF_EXPORTER = "DOMESTIC PROCESSING AND CONSUMPTION",
            IMPORTER = "DOMESTIC",
            COUNTRY = "BRAZIL",
            VOLUME_RAW = total_domestic * 1000,
            VOLUME_PRODUCT = NA_real_,
            FOB = 0,
            HS4 = "XXXX",
            HS6 = "XXXXXX",
            YEAR = year,
            EXPORTER_CNPJ = "DOMESTIC PROCESSING AND CONSUMPTION",
            BRANCH = "DOMESTIC",
            GEOCODE_SOURCE = "UNKNOWN",
            ZERO_DEFORESTATION_BRAZIL_BEEF = 'UNKNOWN'
        )

    # Step 12: Combine all flows into one output table
    all_flows <- bind_rows(
        known_flows, 
        flows_unknown_state_known,
        flows_fully_unknown,
        domestic_embedded_flows
    ) %>%
    select(
        all_of(FLOWS_COLS),
        all_of(SPATIAL_METRICS_QUANTS),
        ExportKnownShare,
        ExportUnknownShare,
        FlowKnownShare,
        FlowUnknownShare,
        FlowFullyUnknownShare
    )

    # Step 13: Export result to S3
    s3write_using(all_flows,
        object = glue(PATH_OUTPUT),
        bucket = "trase-storage",
        FUN = write_parquet,
        opts = c("check_region" = T, multipart = TRUE)
    )

}
import pandas as pd


def model(dbt, cursor):
    dbt.source(
        "trase-storage-raw",
        "pasture_beef_deforestation_emissions_2010_2023_multilevel",
    )
    dbt.source("trase-storage-raw", "seipcs_brazil_beef_2014")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})