Indonesia Palm Oil Mills Clean
s3://trase-storage/indonesia/palm_oil/logistics/clean/IDN_PO_mills_clean.csv
Dbt path: trase_production.main.indonesia_palm_oil_mills_clean
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/indonesia/palm_oil/logistics/clean/_schema.yml
Model file link: trase/data_pipeline/models/indonesia/palm_oil/logistics/clean/indonesia_palm_oil_mills_clean.py
Calls script: trase/data/indonesia/palm_oil/logistics/mills/clean/mills_clean.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model
indonesia_palm_oil_mills_clean
Description
No description
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
model.trase_duckdb.indonesia_palm_oil_mills_original_20250710model.trase_duckdb.indonesia_spatial_boundaries_from_trase_regions_kabupaten
import pandas as pd
import numpy as np
import geopandas as gpd
import json
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_geojson
from trase.tools.aws.metadata import write_csv_for_upload
ORIGINAL_FILE_PATH = (
"indonesia/palm_oil/logistics/clean/originals/20250710_IDN_PO_Mills.csv"
)
OUTPUT_PATH = "indonesia/palm_oil/logistics/clean/IDN_PO_mills_clean.csv"
KABUPATEN_GEOJSON_PATH = (
"indonesia/spatial/BOUNDARIES/from_trase_regions/kabupaten.geojson"
)
COLUMNS_TO_EXPORT = [
"trase_code",
"uml_id",
"group",
"company",
"mill_name",
"latitude",
"longitude",
"capacity_tonnes_ffb_hour",
"active",
"earliest_year_of_existence",
"earliest_year_of_existence_source",
"kabupaten_name",
"kabupaten_trase_id",
"province_name",
]
def cleanup_data(original_df):
"""Some standard cleaning on the dataframe, such as
removing 'NA' from numeric columns and splitting joined text."""
df = original_df.copy()
# replace "NA" for NaNs in numeric columns. When the CSV is written, these will
# become the empty string
columns_to_replace_na = ["cap"]
df[columns_to_replace_na] = df[columns_to_replace_na].replace("NA", np.nan)
# map number columns to their meanings
active_mapping = {1: "Operating", 0: "No longer operating"}
df["active"] = df["active"].replace(active_mapping)
earliest_year_of_existence_source_mapping = {
1: "Establishment year based on dinas perkebunan record of establishment (e.g. Dinas Perkebunan Plantation Statistics report of establishment dates)",
2: "Establishment year based on other government record of establishment (e.g. Permit database)",
3: "Establishment year imputed based on Dinas Perkebunan Plantation Statistics reports (e.g. first year a mill appears in a Dinas Perkebunan Plantation Statistics report)",
4: "Establishment year based off secondary sources (e.g. website, corporate report, etc)",
5: "Earliest year mill found to exist based on photo-interpretation of high-resolution satellite imagery, manufacturing census data, traceability reports and other secondary sources (or combination of these sources)",
6: "Mill known to exist in 1999 (source: Taib, Gunatif (2000), Kajian Pengembangan Industri Crude Palm Oil Skala Kecil (Studi Kasus Pengembangan Industri Crude Palm Oil di Sumatera Barat), http://repository.ipb.ac.id/handle/123456789/5044)",
7: "Mill known to exist in 2004 (source: Golder Associates (2006). Appendix VIII: Summary of Renewable Potential in Indonesia POM, Retrieved October 10, 2018, http://www.adb.org/Documents/Reports/Consulta nt/46557-INO/36557-INO-TACR- AppendixVIII.pdf)",
}
df["earliest_yr_exist_source"] = df["earliest_yr_exist_source"].replace(
earliest_year_of_existence_source_mapping
)
assert not any(df["earliest_yr_exist_source"].isna())
# rename columns for readability
df = df.rename(
columns={
"cap": "capacity_tonnes_ffb_hour",
"earliest_yr_exist": "earliest_year_of_existence",
"earliest_yr_exist_source": "earliest_year_of_existence_source",
},
errors="raise",
)
return df
def match_points_to_kabupatens(original_df):
"""
Matches lat/long locations to kabupaten polygons via a spatial join, replacing
and enriching location information.
For each point defined by latitude and longitude, this function finds the
kabupaten polygon that contains it and adds the following columns:
- 'kabupaten_name' (from the polygon's 'name'),
- 'province_name' (from 'parent_name'),
- 'kabupaten_trase_id' (from 'trase_id').
If a point is not contained within any kabupaten polygon, the columns will
be set to NaN. All rows in the input dataframe are preseved.
"""
df = original_df.copy()
gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude))
gdf.crs = "EPSG:4326"
print(f"Getting kabupatens geojson from S3: {KABUPATEN_GEOJSON_PATH}")
kabupatens = read_geojson(KABUPATEN_GEOJSON_PATH, bucket="trase-storage")
kabupatens.columns = kabupatens.columns.str.lower()
# spatial join to kabupatens
print("Performing spatial join between points and kabupatens")
joined = gpd.sjoin(
gdf,
kabupatens[["geometry", "name", "parent_name", "trase_id"]],
how="left",
predicate="intersects",
).drop(columns=["index_right"])
# rename columns that came from kabupatens
joined = joined.rename(
columns={
"name": "kabupaten_name",
"parent_name": "province_name",
"trase_id": "kabupaten_trase_id",
}
)
return joined
def run_checks(original_df, clean_df):
print("---> Number of rows")
print(f"Original data: {len(original_df)}, clean data: {len(clean_df)}")
if len(original_df) != len(clean_df):
raise ValueError
print("---> Rows with missing kabupaten name")
clean_missing = len(clean_df[clean_df.kabupaten_name.isna()])
print(f"Original data: {len(original_df)}, clean data: {clean_missing}")
def main():
print(f"Getting original data from S3: {ORIGINAL_FILE_PATH}")
mills = get_pandas_df_once(ORIGINAL_FILE_PATH, sep=",", keep_default_na=False)
print("Cleaning up data")
mills_clean = cleanup_data(mills)
print("Matching data to kabupatens geojson")
mills_clean = match_points_to_kabupatens(mills_clean)
print("Running checks")
run_checks(mills, mills_clean)
print(f"Writing results to S3: {OUTPUT_PATH}")
write_csv_for_upload(
df=mills_clean[COLUMNS_TO_EXPORT], key=OUTPUT_PATH, index=False, sep=","
)
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, session):
dbt.config(materialized="external")
dbt.ref("indonesia_palm_oil_mills_original_20250710")
dbt.ref("indonesia_spatial_boundaries_from_trase_regions_kabupaten")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})