View or edit on GitHub
This page is synchronized from trase/models/brazil/beef/ciff/supply_shed_brandao/brando_supply_chain_mapping.ipynb. Last modified on 2026-03-21 22:30 CET by Trase Admin.
Please view or edit the original file there; changes should be reflected here after a midnight build (CET time),
or manually triggering it with a GitHub action (link).
Flagging the connection between CAR and GTA for spatial assessment
This notebook performs the following steps:
- Data Validation
- Geocode Check: Use the
is_validhelper to confirm that geocode fields are not null or empty. -
CAR ID Check: Use the
is_valid_car_idhelper to ensure that CAR IDs are valid (i.e., not null, empty, or set to values like 'none'/'nan'). -
Flagging (Classification) Conditions
Each row is categorized based on the CAR ID (farm scale connection) and geocode (municipality scale connection) information: - Situation 0: Both CAR IDs are missing (or unspecified) while both geocodes are valid.
Flag: "Flows only at municipality scale" - Situation 1: Tier-1 CAR ID is missing (with a valid geocode) and the direct farm CAR ID is valid.
Flag: "CAR_ID of tier-1 Unknown" - Situation 2: Direct farm CAR ID is missing (with a valid geocode) and the tier-1 CAR ID is valid.
Flag: "CAR_ID of direct is Unknown" - Situation 3: Both CAR IDs and their geocodes are valid.
Flag: "CAR_ID from tier-1 to facility" - Situation 4: Tier-1 CAR ID is missing (with an invalid geocode) while the direct farm CAR ID is valid.
Flag: "CAR_ID with no connections to indirect" -
Situation 5: If either geocode is invalid (missing or unspecified for one connection) and the row hasn’t been flagged yet.
Flag: "Flows partially only at municipality scale" -
Applying the Flags
-
A new column
car_id_flagis created to store the classification for each row. -
Exporting the Flagged Data
-
The resulting DataFrame with flags is saved as a new parquet file.
-
Generating the Summary Table
- Grouping: The data is grouped by
car_id_flagto sum the total animals sent from tier-1 (indirect farms) and direct farms. - Proportion Calculation:
- Calculate the percentage of animals sent from tier-1 and direct farms for each flag relative to the overall totals.
- Renaming Columns:
- Rename
car_id_flagto "CAR_ID flag". - Rename
tier1_animals_sentto "Sum of animals sent from indirect farms". - Rename
direct_animals_sentto "Sum of animals sent from direct farms".
- Rename
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.metadata import write_parquet_for_upload
# Load direct and tier-1 database
df_test = read_s3_parquet(
"brazil/logistics/gta/silver/para_full_tier1_to_slaughterhouse_gtas_car_match.parquet"
)
# Helper function to check if a value is valid (for geocodes)
def is_valid(series):
return series.notna() & (series != "")
# Helper function to check if a CAR ID is valid.
# A valid CAR ID is not null, not an empty string, and not one of the strings: 'none' or 'nan' (case insensitive).
def is_valid_car_id(series):
return series.notna() & ~(
series.astype(str).str.strip().str.lower().isin(["none", "nan", ""])
)
# Make a copy of the original DataFrame
df_flagged = df_test.copy()
# Define valid conditions for geocodes
tier1_geocode_valid = is_valid(df_flagged["tier1_geocode"])
direct_farm_geocode_valid = is_valid(df_flagged["direct_farm_geocode"])
# Define valid conditions for CAR IDs using the enhanced check
tier1_car_id_valid = is_valid_car_id(df_flagged["tier1_car_id"])
direct_farm_car_id_valid = is_valid_car_id(df_flagged["direct_farm_car_id"])
# Define missing (or not-specified) CAR ID conditions as the opposite of valid
tier1_car_id_missing = ~tier1_car_id_valid
direct_farm_car_id_missing = ~direct_farm_car_id_valid
# ---------------------------
# Flagging (Classification) Conditions
# ---------------------------
# Situation 0: Both CAR IDs are missing (or unspecified) AND both geocodes are valid.
situation0 = (tier1_car_id_missing & tier1_geocode_valid) & (
direct_farm_car_id_missing & direct_farm_geocode_valid
)
# Situation 1: Tier-1 CAR ID is missing (with valid geocode) and direct_farm CAR ID is valid.
situation1 = (tier1_car_id_missing & tier1_geocode_valid) & (
direct_farm_car_id_valid & direct_farm_geocode_valid
)
# Situation 2: Direct_farm CAR ID is missing (with valid geocode) and tier1 CAR ID is valid.
situation2 = (tier1_car_id_valid & tier1_geocode_valid) & (
direct_farm_car_id_missing & direct_farm_geocode_valid
)
# Situation 3: Both CAR IDs are valid along with their geocodes.
situation3 = (tier1_car_id_valid & tier1_geocode_valid) & (
direct_farm_car_id_valid & direct_farm_geocode_valid
)
# Situation 4: Tier-1 CAR ID is missing (and its geocode is not valid) while direct_farm CAR ID is valid.
situation4 = (tier1_car_id_missing & ~tier1_geocode_valid) & (
direct_farm_car_id_valid & direct_farm_geocode_valid
)
# Create a new column to flag the rows
df_flagged["car_id_flag"] = ""
# Apply the flags (order here is not an issue as the conditions are mutually exclusive)
df_flagged.loc[situation0, "car_id_flag"] = "Flows only at municipality scale"
df_flagged.loc[situation1, "car_id_flag"] = "CAR_ID of tier-1 Unknown"
df_flagged.loc[situation2, "car_id_flag"] = "CAR_ID of direct is Unknown"
df_flagged.loc[situation3, "car_id_flag"] = "CAR_ID from tier-1 to facility"
df_flagged.loc[situation4, "car_id_flag"] = "CAR_ID with no connections to indirect"
# ------------------------------------------------------------------
# Additional Condition:
# Situation 5: If either geocode (tier1 or direct_farm) is not valid and the row is still not flagged,
# flag as "Flows partially only at municipality scale"
# ------------------------------------------------------------------
situation5 = ((~tier1_geocode_valid) | (~direct_farm_geocode_valid)) & (
df_flagged["car_id_flag"] == ""
)
df_flagged.loc[situation5, "car_id_flag"] = "Flows partially only at municipality scale"
# Export table with categorization of CAR_ID condition
write_parquet_for_upload(
df_flagged,
"brazil/logistics/gta/silver/para_full_tier1_to_slaughterhouse_gtas_car_match_flagged.parquet",
)
# 1) Group by car_id_flag and sum the animals sent from tier1 and direct
summary_df = (
df_flagged.groupby("car_id_flag", dropna=False)[
["tier1_animals_sent", "direct_animals_sent"]
]
.sum()
.reset_index()
)
# 2) Compute total animals for tier1 (indirect) and direct
total_tier1 = summary_df["tier1_animals_sent"].sum()
total_direct = summary_df["direct_animals_sent"].sum()
# 3) Calculate proportions for each row relative to the total
summary_df["Proportion of tier1 sent (%)"] = (
summary_df["tier1_animals_sent"] / total_tier1 * 100
).fillna(0)
summary_df["Proportion of direct sent (%)"] = (
summary_df["direct_animals_sent"] / total_direct * 100
).fillna(0)
# 4) Rename columns to the desired labels
summary_df.rename(
columns={
"car_id_flag": "CAR_ID flag",
"tier1_animals_sent": "Sum of animals sent from indirect farms",
"direct_animals_sent": "Sum of animals sent from direct farms",
},
inplace=True,
)
# 5) Display the final DataFrame in Jupyter (just show the DataFrame)
summary_df
Distance Calculation for Supply Chain Data
This notebook cell computes road distances and durations between suppliers (direct or tier‑1) and slaughterhouses by querying the OSRM API. In regions like the Amazon, where road connections may be missing due to reliance on waterways and ferries, the code falls back to using the great‐circle (Haversine) distance.
Below is an overview of the main steps and logic implemented:
- Data Partitioning:
- Direct Connections:
Rows without tier‑1 coordinates are assumed to have only a direct connection. The distance is computed from the direct supplier (direct farm) to the slaughterhouse. CAR_ID is a mandatory requirement; thus, we included only connections where we can identify either the farm for tier-1 and/or direct suppliers. Flows mapped only at municipality level where excluded. -
Tier‑1 (Indirect) Connections:
Rows with tier‑1 coordinates may have:- Direct Farm Available:
The distance is computed in two legs: - Leg 1: From the tier‑1 supplier to the direct farm.
- Leg 2: From the direct farm to the slaughterhouse.
- NOTE: Two other situations might be present in the data (around 17% of the transported animals): 1. Tier-1 is mapped but CAR ID is unknown and connected to a known direct CAR ID; 2. Tier-1 is mapped at CAR level (known CAR ID) but connected to a direct with unknow CAR ID. Both types of connection were also included in the supply shed mapping based on Brandao’s method. The total distance and duration are the sums of the two legs (even in cases where tier-1 is closer to the slaughterhouse).
- Direct Farm Missing:
A single leg distance is computed directly from the tier‑1 supplier to the slaughterhouse.
- Direct Farm Available:
-
OSRM API Integration with Chunking:
- The code queries the OSRM Table Service to retrieve road distances and durations.
- To handle large datasets and potential API limits, the input is split into multiple chunks (for both origins and destinations).
-
Each chunk combination is processed independently; the returned matrices are parsed and mapped back to the original data rows.
-
Fallback to Haversine (Great‑Circle) Distance:
- If OSRM returns an invalid result (e.g.,
None,NaN, or a non‑positive distance), the code computes the Haversine distance between the two coordinates. -
This fallback ensures that a distance is always provided—even when the OSRM API cannot find a road connection.
-
Method Recording:
- A new column,
distance_method, is added to the final output:- "OSRM": Indicates that the OSRM API provided a valid road distance.
- "Haversine": Indicates that the Haversine distance was used as a fallback.
-
For tier‑1 connections with two legs, if both legs use OSRM, the final method is marked as
"OSRM". If either leg falls back to Haversine, the final method is"Haversine". -
Output:
- The final DataFrame contains:
road_distance_m: Total distance in meters.road_duration_s: Estimated travel duration in seconds.distance_method: The method used to compute the distance ("OSRM"or"Haversine").
- This DataFrame is then written to a CSV file for further analysis.
Below is the full code cell implementing the above logic.
- NOTE: The distance profiles can only be calculated LOCALLY with docker.
import pandas as pd
import numpy as np
import requests
import math
import concurrent.futures
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.metadata import write_parquet_for_upload
def haversine(lon1, lat1, lon2, lat2):
"""
Compute the great-circle (Haversine) distance between two points on Earth in meters.
"""
R = 6371000.0 # Earth radius in meters
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = (
math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def chunker(seq, size):
"""
Generator that yields successive chunks of length 'size' from list 'seq'.
"""
for pos in range(0, len(seq), size):
yield seq[pos : pos + size]
def compute_osrm_for_df(
df_input,
origin_id,
origin_lat,
origin_lon,
dest_id,
dest_lat,
dest_lon,
server_url="http://localhost:5000",
chunk_size=50,
):
"""
For a given DataFrame with origin and destination columns, this function:
- Deduplicates and builds unique origin/destination records.
- Processes chunks via OSRM table service concurrently.
- If OSRM returns an invalid value, falls back immediately to the Haversine calculation.
- Records the method used in a "distance_method" lookup.
- Merges the lookup results back into the original rows.
Returns a DataFrame with columns "road_distance_m", "road_duration_s", and "distance_method".
"""
# Build unique origins and destinations, and assign indices:
origins = (
df_input[[origin_id, origin_lat, origin_lon]]
.drop_duplicates()
.reset_index(drop=True)
)
origins["origin_index"] = origins.index
destinations = (
df_input[[dest_id, dest_lat, dest_lon]].drop_duplicates().reset_index(drop=True)
)
destinations["dest_index"] = destinations.index
# Filter out non-finite coordinates:
origins = origins[
np.isfinite(origins[origin_lat]) & np.isfinite(origins[origin_lon])
]
destinations = destinations[
np.isfinite(destinations[dest_lat]) & np.isfinite(destinations[dest_lon])
]
# Build chunks:
origin_chunks = list(chunker(origins.to_dict("records"), chunk_size))
dest_chunks = list(chunker(destinations.to_dict("records"), chunk_size))
combined_distance_lookup = {}
combined_duration_lookup = {}
combined_method_lookup = {}
def process_chunk(ochunk_i, ochunk, dchunk_j, dchunk):
"""
Processes one (origin_chunk, dest_chunk) pair.
Returns local lookup dictionaries for distance, duration, and method.
"""
coords_list = [f"{o[origin_lon]},{o[origin_lat]}" for o in ochunk]
num_o = len(ochunk)
coords_list.extend([f"{d[dest_lon]},{d[dest_lat]}" for d in dchunk])
num_d = len(dchunk)
coords_str = ";".join(coords_list)
sources_str = ";".join(str(i) for i in range(num_o))
destinations_str = ";".join(str(num_o + j) for j in range(num_d))
table_url = f"{server_url}/table/v1/driving/{coords_str}"
params = {
"sources": sources_str,
"destinations": destinations_str,
"annotations": "distance,duration",
}
try:
resp = requests.get(table_url, params=params)
data = resp.json()
except Exception as e:
raise RuntimeError(f"Request failed: {e}")
if "distances" not in data or "durations" not in data:
raise RuntimeError(
f"OSRM response missing 'distances' or 'durations': {data}"
)
dist_mat = data["distances"]
dur_mat = data["durations"]
local_distance_lookup = {}
local_duration_lookup = {}
local_method_lookup = {}
for i in range(num_o):
oi = ochunk[i]["origin_index"]
for j in range(num_d):
di = dchunk[j]["dest_index"]
dist_val = dist_mat[i][j]
dur_val = dur_mat[i][j]
# Fallback check for invalid OSRM responses:
if (
dist_val is None
or not isinstance(dist_val, (int, float))
or (
isinstance(dist_val, float)
and (np.isnan(dist_val) or np.isinf(dist_val) or dist_val <= 0)
)
):
fallback = haversine(
ochunk[i][origin_lon],
ochunk[i][origin_lat],
dchunk[j][dest_lon],
dchunk[j][dest_lat],
)
print(
f"Fallback (chunk {ochunk_i},{dchunk_j}): Using haversine from ({ochunk[i][origin_lon]},{ochunk[i][origin_lat]}) to ({dchunk[j][dest_lon]},{dchunk[j][dest_lat]}) = {fallback:.2f} m"
)
dist_val = fallback
dur_val = fallback / 11.11
local_method_lookup[(oi, di)] = "great-circle (Haversine)"
else:
local_method_lookup[(oi, di)] = "OSRM"
local_distance_lookup[(oi, di)] = dist_val
local_duration_lookup[(oi, di)] = dur_val
print(
f"Processed chunk: origin_chunk={ochunk_i}, dest_chunk={dchunk_j}, origins={num_o}, destinations={num_d}"
)
return local_distance_lookup, local_duration_lookup, local_method_lookup
# Run chunks in parallel:
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for ochunk_i, ochunk in enumerate(origin_chunks):
for dchunk_j, dchunk in enumerate(dest_chunks):
futures.append(
executor.submit(process_chunk, ochunk_i, ochunk, dchunk_j, dchunk)
)
for future in concurrent.futures.as_completed(futures):
local_dist_lookup, local_dur_lookup, local_meth_lookup = future.result()
combined_distance_lookup.update(local_dist_lookup)
combined_duration_lookup.update(local_dur_lookup)
combined_method_lookup.update(local_meth_lookup)
# Merge lookup dictionaries with original input:
df_merged = df_input.copy()
df_merged = df_merged.merge(
origins[[origin_id, "origin_index"]], on=origin_id, how="left", validate="m:1"
)
df_merged = df_merged.merge(
destinations[[dest_id, "dest_index"]], on=dest_id, how="left", validate="m:1"
)
if df_merged["origin_index"].isna().sum() or df_merged["dest_index"].isna().sum():
print("WARNING: Some rows are missing origin_index or dest_index.")
def get_distance(row):
key = (row["origin_index"], row["dest_index"])
# OSRM computed distance
osrm_val = combined_distance_lookup.get(key, None)
# Compute the Haversine distance regardless:
hav_val = haversine(
row[origin_lon], row[origin_lat], row[dest_lon], row[dest_lat]
)
# Get current method (or default to Haversine if no OSRM value)
method = combined_method_lookup.get(key, "great-circle (Haversine)")
# Check for OSRM value validity:
if osrm_val is None or not np.isfinite(osrm_val) or osrm_val <= 0:
chosen = hav_val
method = "great-circle (Haversine)"
else:
# Calculate relative difference:
if hav_val > 0:
diff = abs(osrm_val - hav_val) / hav_val
# If the OSRM distance differs more than 30% compared to Haversine, use Haversine.
if diff > 0.30:
chosen = hav_val
method = "great-circle (Haversine)"
else:
chosen = osrm_val
else:
chosen = (
osrm_val # fallback, though hav_val should be >0 for valid inputs
)
# Update method lookup:
combined_method_lookup[key] = method
return chosen
def get_duration(row):
key = (row["origin_index"], row["dest_index"])
dur_val = combined_duration_lookup.get(key, None)
if dur_val is None or not np.isfinite(dur_val) or dur_val <= 0:
d = get_distance(row)
if d is not None:
dur_val = d / 11.11
return dur_val
def get_method(row):
key = (row["origin_index"], row["dest_index"])
return combined_method_lookup.get(key, "great-circle (Haversine)")
df_merged["road_distance_m"] = df_merged.apply(get_distance, axis=1)
df_merged["road_duration_s"] = df_merged.apply(get_duration, axis=1)
df_merged["distance_method"] = df_merged.apply(get_method, axis=1)
return df_merged[["road_distance_m", "road_duration_s", "distance_method"]]
def compute_osrm_distances_para_indirect(
input_csv, output_csv, server_url="http://localhost:5000", chunk_size=50
):
"""
Reads the input Parquet file and computes OSRM distances using the following rules:
1. For rows where car_id_flag is "CAR_ID from tier-1 to facility" or "CAR_ID of direct is Unknown":
compute the distance from tier-1 coordinates to the facility.
2. For rows where car_id_flag is "CAR_ID of tier-1 Unknown" or "CAR_ID with no connections to indirect":
compute the distance from direct supplier coordinates to the facility.
Rows with "Flows only at municipality scale" or "Flows partially only at municipality scale"
are dropped. After the OSRM calculation, if the OSRM and Haversine distances differ by more than 30%,
the Haversine distance is used.
"""
# Load input parquet file.
df = read_s3_parquet(input_csv)
# Drop rows with unwanted car_id_flag values.
df = df[
~df["car_id_flag"].isin(
[
"Flows only at municipality scale",
"Flows partially only at municipality scale",
]
)
]
# Convert coordinate columns to numeric.
float_columns = [
"tier1_car_latitude",
"tier1_car_longitude",
"direct_farm_latitude",
"direct_farm_longitude",
"slaughterhouse_latitude",
"slaughterhouse_longitude",
]
for col in float_columns:
df[col] = pd.to_numeric(df[col], errors="coerce")
# Partition the dataset into two groups based on car_id_flag.
group_tier1 = df[
df["car_id_flag"].isin(
["CAR_ID from tier-1 to facility", "CAR_ID of direct is Unknown"]
)
].copy()
group_direct = df[
df["car_id_flag"].isin(
["CAR_ID of tier-1 Unknown", "CAR_ID with no connections to indirect"]
)
].copy()
# For Tier1-based rows, compute distance using tier1 coordinates.
if not group_tier1.empty:
results_tier1 = compute_osrm_for_df(
group_tier1,
origin_id="tier1_car_id",
origin_lat="tier1_car_latitude",
origin_lon="tier1_car_longitude",
dest_id="slaughterhouse_uni_id",
dest_lat="slaughterhouse_latitude",
dest_lon="slaughterhouse_longitude",
server_url=server_url,
chunk_size=chunk_size,
)
group_tier1["road_distance_m"] = results_tier1["road_distance_m"]
group_tier1["road_duration_s"] = results_tier1["road_duration_s"]
group_tier1["distance_method"] = results_tier1["distance_method"]
else:
group_tier1["road_distance_m"] = None
group_tier1["road_duration_s"] = None
group_tier1["distance_method"] = None
# For Direct supplier-based rows, compute distance using direct supplier coordinates.
if not group_direct.empty:
results_direct = compute_osrm_for_df(
group_direct,
origin_id="direct_farm_car_id",
origin_lat="direct_farm_latitude",
origin_lon="direct_farm_longitude",
dest_id="slaughterhouse_uni_id",
dest_lat="slaughterhouse_latitude",
dest_lon="slaughterhouse_longitude",
server_url=server_url,
chunk_size=chunk_size,
)
group_direct["road_distance_m"] = results_direct["road_distance_m"]
group_direct["road_duration_s"] = results_direct["road_duration_s"]
group_direct["distance_method"] = results_direct["distance_method"]
else:
group_direct["road_distance_m"] = None
group_direct["road_duration_s"] = None
group_direct["distance_method"] = None
# Combine the groups.
df_final = pd.concat([group_tier1, group_direct], axis=0)
# Final fallback post-processing: if any row still has a missing road_distance_m,
# compute it using Haversine.
def final_fallback_distance(row):
if pd.isnull(row["road_distance_m"]) or (
isinstance(row["road_distance_m"], float)
and np.isnan(row["road_distance_m"])
):
if pd.notnull(row.get("tier1_car_latitude")):
return haversine(
row["tier1_car_longitude"],
row["tier1_car_latitude"],
row["slaughterhouse_longitude"],
row["slaughterhouse_latitude"],
)
elif pd.notnull(row.get("direct_farm_latitude")):
return haversine(
row["direct_farm_longitude"],
row["direct_farm_latitude"],
row["slaughterhouse_longitude"],
row["slaughterhouse_latitude"],
)
return row["road_distance_m"]
df_final["road_distance_m"] = df_final.apply(final_fallback_distance, axis=1)
# The get_distance logic already forces haversine when difference >30%; update that logic.
df_final["distance_method"] = df_final["distance_method"].fillna(
"great-circle (Haversine)"
)
df_final["road_duration_s"] = df_final["road_distance_m"] / 11.11
# Keep only desired columns:
keep_cols = [
"tier1_car_id",
"tier1_car_property_area",
"tier1_car_latitude",
"tier1_car_longitude",
"tier1_animals_sent",
"tier1_tax_number",
"tier1_geocode",
"direct_farm_car_id",
"direct_farm_latitude",
"direct_farm_longitude",
"direct_animals_sent",
"direct_farm_tax_number",
"direct_farm_geocode",
"slaughterhouse_uni_id",
"slaughterhouse_ref_name",
"slaughterhouse_tax_number",
"slaughterhouse_geocode",
"slaughterhouse_longitude",
"slaughterhouse_latitude",
"tier1_candidate_cars",
"road_distance_m",
"road_duration_s",
"distance_method",
"car_id_flag",
]
df_final = df_final[keep_cols]
write_parquet_for_upload(df_final, output_csv)
print(f"Done. Wrote output to: {output_csv}")
return df_final
if __name__ == "__main__":
input_file = "brazil/logistics/gta/silver/para_full_tier1_to_slaughterhouse_gtas_car_match_flagged.parquet"
output_csv = "brazil/beef/ciff/supply_shed_reviews/in/para_indirect_gta_car_join_2020_filtered_with_distances.parquet"
compute_osrm_distances_para_indirect(
input_csv=input_file,
output_csv=output_csv,
server_url="http://localhost:5000",
chunk_size=50,
)
Animals and distance cost in 2020
In this analysis, we focus on two main aspects for the top 10 slaughterhouses by total animals processed in 2020:
- Animal Supply Aggregation
- Direct Supply:
We aggregated the number of animals delivered directly from suppliers using the fielddirect_farm_animals_sent. - Indirect Supply:
We aggregated the number of animals delivered via tier‑1 (indirect) suppliers to direct ones using the fieldtier1_animals_sent. - The total animal count per slaughterhouse is computed as the sum of direct suppliers (only). The top 10 slaughterhouses were selected for detailed analysis, while the remaining facilities were grouped into an "Others" category. We calculate the "probable" indirect volume by subtracting the indirect animals by direct animals. The result is the fraction that likely came from indirect suppliers via direct ones. Because of the complex dynamics of the cattle supply chain, this is just a rough approximation.
-
Key Findings:
- JSB in Para is the slaughterhouse receiving the highest number of animals in 2020, with a total of approximately 150 million animals.
- JSB also has the highest proportion of indirect supply, with around 5% of its animals delivered via tier‑1 suppliers.
-
Average Road Distance Analysis
- We computed the average road distance (
road_distance_m) from the supply source to the slaughterhouse. - Distances are originally measured in meters and were converted to kilometres for reporting.
- For routes involving tier‑1 suppliers, if the OSRM API did not provide a valid road connection (due to missing waterways or ferries in the Amazon), we used the great‑circle (Haversine) distance as a fallback.
- Key Insight:
On average, cattle delivered via tier‑1 connections travel approximately 200 km from source to slaughterhouse, with a standard deviation of about 75 km.
Visualization Dashboard
The analysis is visualized with two charts:
-
Left Chart:
A horizontal stacked bar chart displaying the total number of animals per slaughterhouse, separated into direct and probable indirect portion of the supply chain by facility. -
Right Chart:
A vertical bar chart showing the average road distance (in kilometres) per slaughterhouse, using the same top 10 facilities plus an "Others" category.
import pandas as pd
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from trase.tools.aws.aws_helpers import read_s3_parquet
import plotly.io as pio
pio.renderers.default = "iframe_connected"
# 1. Load the output CSV file and convert relevant columns to numeric
distance = read_s3_parquet(
"brazil/beef/ciff/supply_shed_reviews/in/para_indirect_gta_car_join_2020_filtered_with_distances.parquet"
)
for col in ["direct_animals_sent", "tier1_animals_sent", "road_distance_m"]:
distance[col] = pd.to_numeric(distance[col], errors="coerce")
# 2. Aggregate animal counts by slaughterhouse
animals_df = (
distance.groupby("slaughterhouse_ref_name")
.agg({"direct_animals_sent": "sum", "tier1_animals_sent": "sum"})
.reset_index()
)
animals_df["total_animals"] = (
animals_df["direct_animals_sent"] + animals_df["tier1_animals_sent"]
)
animals_df = animals_df.sort_values(by="total_animals", ascending=False)
# Select top 10 slaughterhouses and combine the rest as "Others"
top10 = animals_df.head(10).copy()
others = animals_df.iloc[10:]
others_row = {
"slaughterhouse_ref_name": "Others",
"direct_animals_sent": others["direct_animals_sent"].sum(),
"tier1_animals_sent": others["tier1_animals_sent"].sum(),
"total_animals": others["total_animals"].sum(),
}
animals_agg = pd.concat([top10, pd.DataFrame([others_row])], ignore_index=True)
# 3. Compute average road distance (in meters) per slaughterhouse
distance_df = (
distance.groupby("slaughterhouse_ref_name")
.agg({"road_distance_m": "mean"})
.reset_index()
)
avg_distance = pd.merge(
animals_agg[["slaughterhouse_ref_name"]],
distance_df,
on="slaughterhouse_ref_name",
how="left",
)
# For "Others": if not directly available in distance_df, compute its average from those not in top10
if "Others" not in distance_df["slaughterhouse_ref_name"].values:
others_list = animals_df[
~animals_df["slaughterhouse_ref_name"].isin(top10["slaughterhouse_ref_name"])
]["slaughterhouse_ref_name"].unique()
others_avg = distance_df[distance_df["slaughterhouse_ref_name"].isin(others_list)][
"road_distance_m"
].mean()
others_df = pd.DataFrame(
[{"slaughterhouse_ref_name": "Others", "road_distance_m": others_avg}]
)
avg_distance = pd.concat([avg_distance, others_df], ignore_index=True)
# Convert average distance from meters to kilometres
avg_distance["avg_distance_km"] = avg_distance["road_distance_m"] / 1000.0
# Ensure the order of slaughterhouses is consistent in both aggregated tables
order = animals_agg["slaughterhouse_ref_name"].tolist()
animals_agg = animals_agg.set_index("slaughterhouse_ref_name").loc[order].reset_index()
avg_distance = (
avg_distance.set_index("slaughterhouse_ref_name").loc[order].reset_index()
)
# 4. Create a two-panel dashboard using Plotly subplots
fig = make_subplots(
rows=1,
cols=2,
subplot_titles=(
"<b>Animal Counts by Slaughterhouse (Direct vs Indirect)</b>",
"<b>Average Road Distance by Slaughterhouse (km)</b>",
),
horizontal_spacing=0.15,
)
# Left panel: Horizontal stacked bar chart for animal counts
fig.add_trace(
go.Bar(
y=animals_agg["slaughterhouse_ref_name"],
x=animals_agg["direct_animals_sent"],
name="Direct",
orientation="h",
marker_color="#19404C",
),
row=1,
col=1,
)
fig.add_trace(
go.Bar(
y=animals_agg["slaughterhouse_ref_name"],
x=animals_agg["tier1_animals_sent"],
name="Indirect",
orientation="h",
marker_color="#FF6A5F",
),
row=1,
col=1,
)
# Right panel: Vertical bar chart for average road distance (km)
fig.add_trace(
go.Bar(
x=avg_distance["slaughterhouse_ref_name"],
y=avg_distance["avg_distance_km"],
name="Avg Distance (km)",
showlegend=False,
marker_color="#BBFFEC",
),
row=1,
col=2,
)
# 5. Update layout and styling
fig.update_layout(
height=700,
barmode="stack",
showlegend=True,
font=dict(family="DM Sans", color="#839A8C"),
paper_bgcolor="rgba(0,0,0,0)",
plot_bgcolor="rgba(0,0,0,0)",
)
# Axes styling for left chart (Animal Counts)
fig.update_xaxes(
title_text="<b>Number of Animals</b>",
row=1,
col=1,
tickfont=dict(family="DM Sans", color="#839A8C"),
title_font=dict(family="DM Sans", color="#839A8C"),
)
fig.update_yaxes(
title_text="<b>Slaughterhouse</b>",
row=1,
col=1,
tickfont=dict(family="DM Sans", color="#839A8C"),
title_font=dict(family="DM Sans", color="#839A8C"),
)
# Axes styling for right chart (Average Distance)
fig.update_xaxes(
title_text="<b>Slaughterhouse</b>",
row=1,
col=2,
tickfont=dict(family="DM Sans", color="#839A8C"),
title_font=dict(family="DM Sans", color="#839A8C"),
)
fig.update_yaxes(
title_text="<b>Avg Distance (km)</b>",
row=1,
col=2,
tickfont=dict(family="DM Sans", color="#839A8C"),
title_font=dict(family="DM Sans", color="#839A8C"),
)
# Legend styling
fig.update_layout(
legend=dict(
title=dict(
text="<b>Supply Type</b>", font=dict(family="DM Sans", color="#839A8C")
),
font=dict(family="DM Sans", color="#839A8C"),
)
)
# 6. Adjust subplot titles so that the left title is left aligned and the right title is right aligned.
# Here we set the left title's x coordinate to 0.02 and the right title's to 0.98.
for i, annotation in enumerate(fig.layout.annotations):
if i == 0: # Left subplot title
annotation.xanchor = "left"
annotation.x = -0.5
elif i == 1: # Right subplot title
annotation.xanchor = "right"
annotation.x = 0.9
fig.show()
Example of supply shed at CAR scale fr JBS facilities
The map below shows the farms as centroids supplying JBS facilities in Para in 2020. Includes only tier-1, connected to direct, connected to facilities.
import folium
# --- 1. Load and Filter the Processed Data ---
# Filter for the selected facility type or group, e.g. those with slaughterhouse_uni_id containing "JBS"
# (Adjust filtering as needed, here we use equality. If multiple facilities have a similar pattern, you might use .str.contains)
shed_filter = "JBS" # For example, a string that identifies JBS facilities
df_filtered = distance[
(distance["slaughterhouse_ref_name"] == shed_filter)
& (distance["car_id_flag"] == "CAR_ID from tier-1 to facility")
].copy()
# Ensure 'road_distance_m' is numeric
df_filtered["road_distance_m"] = pd.to_numeric(
df_filtered["road_distance_m"], errors="coerce"
)
# --- 2. Create Distance Categories ---
def label_distance(dist_m):
"""
Convert a distance (in meters) to a label based on categories.
"""
km = dist_m / 1000.0
if km < 10:
return "< 10 km"
elif km < 20:
return "20 km"
elif km < 50:
return "50 km"
elif km < 100:
return "100 km"
elif km < 150:
return "150 km"
else:
return "> 150 km"
df_filtered["distance_label"] = df_filtered["road_distance_m"].apply(label_distance)
# --- 3. Define Color Map for Categories ---
color_map = {
"< 10 km": "#8180F2",
"20 km": "#D9B0EB",
"50 km": "#FCE1B3",
"100 km": "#FABF88",
"150 km": "#F45C57",
"> 150 km": "#421D23",
}
# --- 4. Create the Folium Map with Facility Markers ---
# Extract unique facility locations based on facility ID and coordinates.
facilities = df_filtered[
["slaughterhouse_uni_id", "slaughterhouse_latitude", "slaughterhouse_longitude"]
].drop_duplicates()
# Center the map on the first facility
first_facility = facilities.iloc[0]
center_lat = first_facility["slaughterhouse_latitude"]
center_lon = first_facility["slaughterhouse_longitude"]
m = folium.Map(location=[center_lat, center_lon], zoom_start=7)
# Add a marker for each unique facility.
for idx, facility in facilities.iterrows():
folium.Marker(
location=[
facility["slaughterhouse_latitude"],
facility["slaughterhouse_longitude"],
],
popup=f"Facility: {facility['slaughterhouse_uni_id']}",
icon=folium.Icon(color="red", icon="info-sign"),
).add_to(m)
# --- 5. Add Tier-1 Supplier Points ---
for idx, row in df_filtered.iterrows():
supplier_lat = row["tier1_car_latitude"]
supplier_lon = row["tier1_car_longitude"]
label = row["distance_label"]
color = color_map.get(label, "#000000")
folium.CircleMarker(
location=[supplier_lat, supplier_lon],
radius=4,
color=color,
fill=True,
fill_color=color,
fill_opacity=0.7,
popup=(
f"Supplier ID: {row['tier1_car_id']}<br>"
f"Distance: {row['road_distance_m']/1000:.1f} km<br>"
f"Category: {label}"
),
).add_to(m)
# --- 6. Display the Map ---
m
Implementation of Brandão’s Method in Python for Mapping Supply Zones
This notebook implements a method inspired by Brandão et al. (2023) to delineate supply zones for slaughterhouses based on cattle suppliers. Instead of using ArcGIS Pro’s (We don't have a license) Incremental Spatial Autocorrelation (ISA) tool, we replicate the core concepts in Python using the following steps:
Overview
- Data Preparation
-
The input dataset is loaded from a Parquet file that contains supplier–facility connection data. Key fields include:
- Distances:
road_distance_m(in meters) - Weight Variables:
direct_animals_sent(used as a proxy for cattle volume per supplier) - Location IDs: For facilities (
slaughterhouse_uni_id) and supplier identifiers (e.g.,tier1_car_idanddirect_farm_car_id).
- Distances:
-
Filtering
- We select only the rows with a specified set of
car_id_flagvalues that indicate priority supplier–facility connections. -
For each facility, the analysis focuses solely on that facility’s supplier records.
-
Incremental Spatial Autocorrelation (ISA)
- Adaptation from ArcGIS:
In ArcGIS Pro, the ISA tool calculates Global Moran’s I for successively increasing distance bands to find the scale at which suppliers cluster most strongly. In our Python implementation:- We treat the supplier’s
road_distance_mvalues as a 1D surrogate for spatial location. - For each distance threshold (converted from km to meters) in 25‑km increments, we construct a binary weights matrix (neighbors if the absolute difference in
road_distance_mis less than the threshold). - We then compute Global Moran’s I (and its z‑score) using the supplier weights (direct_animals_sent).
- We treat the supplier’s
-
Selection of Optimal Distance:
We then select the distance (in km) with the highest positive z‑score as the “optimal” clustering threshold. This threshold is interpreted as the characteristic radius within which the majority of the slaughterhouse’s suppliers are clustered. -
Flagging Supply Zone Records
-
Once the optimal distance is determined for a facility:
- A new column,
brandao_supply_zone, is added to flag each supplier–facility record as"YES"if its road distance is less than the optimal threshold (converted to meters) and"NO"otherwise. - In addition, a fixed threshold flag (
Fixed_radius_supply_zone) is created: if the distance is within 350 km (350,000 m), it is flagged as"YES"; otherwise,"NO".
- A new column,
-
Adaptations and Simplifications
- Spatial Weights:
The Python code uses PySAL’sDistanceBand(implemented manually using a neighbor dictionary) to mimic ArcGIS’s distance‐based neighbor relationships. - Distance as a 1D Surrogate:
While ArcGIS’s ISA calculates spatial autocorrelation based on 2D coordinates, the code uses theroad_distance_mfield as a 1D proxy for location. This is a computationally efficient simplification, assuming that the road distance approximates supplier dispersion relative to the facility. Note: ArcGIS Pro calculates distances and spatial autocorrelation in the same implementation using 2D road coordinates. In our case we use a 1D distance road field, but the distance was previously calculated based on a 2D road network from OSRM. - Output:
Instead of directly aggregating heavy polygon geometries, the method flags individual supplier–facility connections. These flags (and the optimal distance per facility) can later be used in a spatial database or for further aggregation.
Summary
- Goal: Replicate Brandão’s method of using incremental Moran’s I to determine an optimal supply zone radius.
- Key Adaptations:
- From ArcGIS’s ISA to Python:
Replaced the ArcGIS ISA tool with incremental computation of Global Moran’s I over a series of distance thresholds using PySAL. - Use of a 1D Spatial Proxy:
Instead of full 2D coordinates, the code usesroad_distance_mas a surrogate for supplier spatial distribution. - Threshold Selection:
The optimal distance is chosen based on the maximum positive z‑score. - Simplified Zone Aggregation:
Rather than creating aggregated polygons from supplier boundaries, rows are flagged (withbrandao_supply_zoneandFixed_radius_supply_zone) so that the supply zone can be later constructed within a geospatial database if needed.
This implementation provides a data-driven way to estimate the supply zone radius for each facility, aligning conceptually with Brandão et al. (2023) while leveraging Python’s spatial and statistical libraries.
import math
import pandas as pd
import numpy as np
from pysal.lib import weights
from esda.moran import Moran
from concurrent.futures import ProcessPoolExecutor, as_completed
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.metadata import write_parquet_for_upload, write_csv_for_upload
# -------------------------------
# FUNCTIONS FOR ISA ANALYSIS
# -------------------------------
def incremental_spatial_autocorrelation(df, weight_col, max_km, step_km=25):
"""
Performs a simplified incremental spatial autocorrelation analysis along a 1D axis
using the 'road_distance_m' field as a surrogate for spatial location.
Two records are considered neighbors if the absolute difference in 'road_distance_m'
is less than the threshold.
Returns a DataFrame with columns: distance_km, moran_I, z_score, and p_value.
"""
values = pd.to_numeric(df[weight_col], errors="coerce").values
coords = df["road_distance_m"].values.reshape(-1, 1) # 1D coordinate in meters
thresholds = [t * 1000 for t in range(step_km, max_km + step_km, step_km)]
results = []
n = len(coords)
for threshold in thresholds:
nbrs = {}
for i in range(n):
# Find neighbors: all j != i with |coords[i] - coords[j]| < threshold
neighbors = [
j
for j in range(n)
if (j != i) and (abs(coords[i][0] - coords[j][0]) < threshold)
]
nbrs[i] = neighbors
try:
w = weights.W(nbrs)
moran = Moran(values, w, two_tailed=False)
results.append(
{
"distance_km": threshold / 1000,
"moran_I": moran.I,
"z_score": moran.z_sim,
"p_value": moran.p_sim,
}
)
except Exception as e:
results.append(
{
"distance_km": threshold / 1000,
"moran_I": np.nan,
"z_score": np.nan,
"p_value": np.nan,
}
)
return pd.DataFrame(results)
def compute_optimal_distance_for_facility(
df, weight_col="direct_animals_sent", step_km=30
):
"""
Computes the optimal distance threshold (in km) based on incremental spatial
autocorrelation using 'road_distance_m' as the 1D spatial variable and supplier weights.
The threshold with the highest positive z_score is selected.
Returns a tuple: (optimal_distance_km, optimal_stats_row).
"""
max_dist = df["road_distance_m"].max() # in meters
max_km = max(50, math.ceil(max_dist / 1000))
isa_df = incremental_spatial_autocorrelation(df, weight_col, max_km, step_km)
# Consider only positive z_scores (indicative of clustering)
isa_df = isa_df[isa_df["z_score"] > 0]
if isa_df.empty:
return 50, {
"distance_km": 50,
"moran_I": np.nan,
"z_score": np.nan,
"p_value": np.nan,
}
optimal_row = isa_df.loc[isa_df["z_score"].idxmax()]
return optimal_row["distance_km"], optimal_row
def assign_supply_zone_flags(df, optimal_km, fixed_radius_km=350):
"""
Add two new columns:
- 'brandao_supply_zone': "YES" if road_distance_m is below optimal_km threshold.
- 'Fixed_radius_supply_zone': "YES" if road_distance_m is below fixed_radius_km; else "NO".
"""
optimal_threshold_m = optimal_km * 1000
fixed_threshold_m = fixed_radius_km * 1000
df["brandao_supply_zone"] = df["road_distance_m"].apply(
lambda x: "YES" if x < optimal_threshold_m else "NO"
)
df["Fixed_radius_supply_zone"] = df["road_distance_m"].apply(
lambda x: "YES" if x < fixed_threshold_m else "NO"
)
return df
# -------------------------------
# FACILITY-LEVEL PROCESSING FUNCTION (to run in parallel)
# -------------------------------
def process_facility(facility_id, df_facility, step_km=25, fixed_radius_km=350):
"""
Processes supplier records for a single facility:
- If the number of records exceeds 10,000, randomly sample 40% of records.
- Computes the optimal distance threshold using incremental ISA.
- Flags records based on the ISA-derived (optimal) distance and a fixed radius of 350 km.
Returns a tuple: (facility_stats_dict, flagged_df)
"""
# If too many records, sample 40%
if len(df_facility) > 5000:
df_facility = df_facility.sample(frac=0.3, random_state=42)
print(
f"Facility {facility_id}: sampling {len(df_facility)} supplier records (30% of >5,000)."
)
if len(df_facility) < 2:
print(
f"Facility {facility_id} has less than 2 supplier records after sampling; skipping ISA."
)
return None, None
optimal_km, optimal_stats = compute_optimal_distance_for_facility(
df_facility, weight_col="direct_animals_sent", step_km=step_km
)
facility_info = df_facility.iloc[0][
["slaughterhouse_uni_id", "slaughterhouse_ref_name"]
].to_dict()
facility_info.update(optimal_stats)
facility_info["optimal_distance_km"] = optimal_km
flagged_df = assign_supply_zone_flags(
df_facility, optimal_km, fixed_radius_km=fixed_radius_km
)
return facility_info, flagged_df
# -------------------------------
# MAIN PROCESSING (Parallel over facilities)
# -------------------------------
if __name__ == "__main__":
# 1. Load the processed distances parquet file
distance = read_s3_parquet(
"brazil/beef/ciff/supply_shed_reviews/in/para_indirect_gta_car_join_2020_filtered_with_distances.parquet"
)
# 2. Filter for rows with the desired car_id_flag values
valid_flags = [
"CAR_ID from tier-1 to facility",
"CAR_ID of direct is Unknown",
"CAR_ID of tier-1 Unknown",
"CAR_ID with no connections to indirect",
]
df_filtered = distance[distance["car_id_flag"].isin(valid_flags)].copy()
df_filtered["road_distance_m"] = pd.to_numeric(
df_filtered["road_distance_m"], errors="coerce"
)
df_filtered["direct_animals_sent"] = pd.to_numeric(
df_filtered["direct_animals_sent"], errors="coerce"
)
# 3. Group by facility using 'slaughterhouse_uni_id'
facility_ids = df_filtered["slaughterhouse_uni_id"].unique()
supply_zone_stats = []
flagged_results = []
# Process facilities in parallel using up to 4 workers
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {}
for fac in facility_ids:
fac_df = df_filtered[df_filtered["slaughterhouse_uni_id"] == fac].copy()
futures[executor.submit(process_facility, fac, fac_df, 25, 350)] = fac
for future in as_completed(futures):
fac = futures[future]
try:
fac_stats, fac_flagged_df = future.result()
if fac_stats is not None and fac_flagged_df is not None:
supply_zone_stats.append(fac_stats)
flagged_results.append(fac_flagged_df)
print(f"Facility {fac} processed.")
else:
print(f"Facility {fac} skipped (insufficient records).")
except Exception as e:
print(f"Error processing facility {fac}: {e}")
if flagged_results:
df_final = pd.concat(flagged_results, ignore_index=True)
else:
df_final = pd.DataFrame()
# 4. Export the facility ISA statistics and flagged records
stats_df = pd.DataFrame(supply_zone_stats)
stats_file = "brazil/beef/ciff/supply_shed_reviews/in/brandao_supply_zone_stats.csv"
write_csv_for_upload(stats_df, key=stats_file, sep=",")
output_file = "brazil/beef/ciff/supply_shed_reviews/in/para_brandao_supply_zone_flagged.parquet"
write_parquet_for_upload(df_final, output_file)
print("Supply zone stats sample:")
print(stats_df.head())
print("\nFlagged supply zone data sample:")
print(df_final.head())