Skip to content

Indonesia Palm Oil Mills Clean

s3://trase-storage/indonesia/palm_oil/logistics/clean/IDN_PO_mills_clean.csv

Dbt path: trase_production.main.indonesia_palm_oil_mills_clean

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/indonesia/palm_oil/logistics/clean/_schema.yml

Model file link: trase/data_pipeline/models/indonesia/palm_oil/logistics/clean/indonesia_palm_oil_mills_clean.py

Calls script: trase/data/indonesia/palm_oil/logistics/mills/clean/mills_clean.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model


indonesia_palm_oil_mills_clean

Description

No description


Details

Column Type Description

Models / Seeds

  • model.trase_duckdb.indonesia_palm_oil_mills_original_20250710
  • model.trase_duckdb.indonesia_spatial_boundaries_from_trase_regions_kabupaten
import pandas as pd
import numpy as np
import geopandas as gpd
import json
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_geojson
from trase.tools.aws.metadata import write_csv_for_upload

ORIGINAL_FILE_PATH = (
    "indonesia/palm_oil/logistics/clean/originals/20250710_IDN_PO_Mills.csv"
)
OUTPUT_PATH = "indonesia/palm_oil/logistics/clean/IDN_PO_mills_clean.csv"
KABUPATEN_GEOJSON_PATH = (
    "indonesia/spatial/BOUNDARIES/from_trase_regions/kabupaten.geojson"
)

COLUMNS_TO_EXPORT = [
    "trase_code",
    "uml_id",
    "group",
    "company",
    "mill_name",
    "latitude",
    "longitude",
    "capacity_tonnes_ffb_hour",
    "active",
    "earliest_year_of_existence",
    "earliest_year_of_existence_source",
    "kabupaten_name",
    "kabupaten_trase_id",
    "province_name",
]


def cleanup_data(original_df):
    """Some standard cleaning on the dataframe, such as
    removing 'NA' from numeric columns and splitting joined text."""
    df = original_df.copy()

    # replace "NA" for NaNs in numeric columns. When the CSV is written, these will
    # become the empty string
    columns_to_replace_na = ["cap"]
    df[columns_to_replace_na] = df[columns_to_replace_na].replace("NA", np.nan)

    # map number columns to their meanings
    active_mapping = {1: "Operating", 0: "No longer operating"}
    df["active"] = df["active"].replace(active_mapping)
    earliest_year_of_existence_source_mapping = {
        1: "Establishment year based on dinas perkebunan record of establishment (e.g. Dinas Perkebunan Plantation Statistics report of establishment dates)",
        2: "Establishment year based on other government record of establishment (e.g. Permit database)",
        3: "Establishment year imputed based on Dinas Perkebunan Plantation Statistics reports (e.g. first year a mill appears in a Dinas Perkebunan Plantation Statistics report)",
        4: "Establishment year based off secondary sources (e.g. website, corporate report, etc)",
        5: "Earliest year mill found to exist based on photo-interpretation of high-resolution satellite imagery, manufacturing census data, traceability reports and other secondary sources (or combination of these sources)",
        6: "Mill known to exist in 1999 (source: Taib, Gunatif (2000), Kajian Pengembangan Industri Crude Palm Oil Skala Kecil (Studi Kasus Pengembangan Industri Crude Palm Oil di Sumatera Barat), http://repository.ipb.ac.id/handle/123456789/5044)",
        7: "Mill known to exist in 2004 (source: Golder Associates (2006). Appendix VIII: Summary of Renewable Potential in Indonesia POM, Retrieved October 10, 2018, http://www.adb.org/Documents/Reports/Consulta nt/46557-INO/36557-INO-TACR- AppendixVIII.pdf)",
    }
    df["earliest_yr_exist_source"] = df["earliest_yr_exist_source"].replace(
        earliest_year_of_existence_source_mapping
    )
    assert not any(df["earliest_yr_exist_source"].isna())

    # rename columns for readability
    df = df.rename(
        columns={
            "cap": "capacity_tonnes_ffb_hour",
            "earliest_yr_exist": "earliest_year_of_existence",
            "earliest_yr_exist_source": "earliest_year_of_existence_source",
        },
        errors="raise",
    )
    return df


def match_points_to_kabupatens(original_df):
    """
    Matches lat/long locations to kabupaten polygons via a spatial join, replacing
    and enriching location information.

    For each point defined by latitude and longitude, this function finds the
    kabupaten polygon that contains it and adds the following columns:

        - 'kabupaten_name' (from the polygon's 'name'),
        - 'province_name' (from 'parent_name'),
        - 'kabupaten_trase_id' (from 'trase_id').

    If a point is not contained within any kabupaten polygon, the columns will
    be set to NaN. All rows in the input dataframe are preseved.
    """
    df = original_df.copy()
    gdf = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.longitude, df.latitude))
    gdf.crs = "EPSG:4326"

    print(f"Getting kabupatens geojson from S3: {KABUPATEN_GEOJSON_PATH}")
    kabupatens = read_geojson(KABUPATEN_GEOJSON_PATH, bucket="trase-storage")
    kabupatens.columns = kabupatens.columns.str.lower()

    # spatial join to kabupatens
    print("Performing spatial join between points and kabupatens")
    joined = gpd.sjoin(
        gdf,
        kabupatens[["geometry", "name", "parent_name", "trase_id"]],
        how="left",
        predicate="intersects",
    ).drop(columns=["index_right"])

    # rename columns that came from kabupatens
    joined = joined.rename(
        columns={
            "name": "kabupaten_name",
            "parent_name": "province_name",
            "trase_id": "kabupaten_trase_id",
        }
    )
    return joined


def run_checks(original_df, clean_df):
    print("---> Number of rows")
    print(f"Original data: {len(original_df)}, clean data: {len(clean_df)}")
    if len(original_df) != len(clean_df):
        raise ValueError
    print("---> Rows with missing kabupaten name")
    clean_missing = len(clean_df[clean_df.kabupaten_name.isna()])
    print(f"Original data: {len(original_df)}, clean data: {clean_missing}")


def main():
    print(f"Getting original data from S3: {ORIGINAL_FILE_PATH}")
    mills = get_pandas_df_once(ORIGINAL_FILE_PATH, sep=",", keep_default_na=False)

    print("Cleaning up data")
    mills_clean = cleanup_data(mills)

    print("Matching data to kabupatens geojson")
    mills_clean = match_points_to_kabupatens(mills_clean)

    print("Running checks")
    run_checks(mills, mills_clean)

    print(f"Writing results to S3: {OUTPUT_PATH}")
    write_csv_for_upload(
        df=mills_clean[COLUMNS_TO_EXPORT], key=OUTPUT_PATH, index=False, sep=","
    )


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, session):
    dbt.config(materialized="external")

    dbt.ref("indonesia_palm_oil_mills_original_20250710")
    dbt.ref("indonesia_spatial_boundaries_from_trase_regions_kabupaten")

    raise NotImplementedError()

    return pd.DataFrame({"hello": ["world"]})