Runbook

View or edit on GitHub

This page is synchronized from trase/models/indonesia/palm_oil/Runbook.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin. Please view or edit the original file there; changes should be reflected here after a midnight build (CET time), or manually triggering it with a GitHub action (link).


from trase.runbook.indonesia.palm_oil.trade.c_create_post_embed_supply_chain import main

main(only_year=2020, n_threads=5, chunk_size=200001)
from trase.runbook.indonesia.palm_oil.trade.d_ingest_percentage_traded_under_zdc import (
    main,
)

main()
from trase.runbook.indonesia.palm_oil.trade.e_create_full_national_supply_chain import (
    main,
)

main()
from trase.runbook.indonesia.palm_oil.trade.f_ingest_exports import main

main()
from trase.runbook.indonesia.palm_oil.trade.g_create_full_exports import main

main()
from trase.runbook.indonesia.palm_oil.trade.h_create_full_supply_chain import main

main()
from trase.runbook.indonesia.palm_oil.trade.i_add_national_data import main

main()
from trase.runbook.indonesia.palm_oil.trade.j_create_simplified_supply_chain import main

main(only_year=2018, n_threads=1, chunk_size=100000)
from tqdm import tqdm
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers import get_pandas_df

EMBEDDING_PARAMETERS_LIST = [
    EmbedNodeIndFlowInd(
        node_role="EXPORTER",
        node_ind_name="FOREST_500_PALM_OIL",
        node_ind_reference="TRADERS INDICATORS V3 FULL",
    ),
    EmbedNodeQuantFlowQuant(
        node_role="CONCESSION OF PRODUCTION",
        node_quant_name="OIL_PALM_HA",
        flow_quant_name="LAND_USE",
        production_node_quant_name="PALM_OIL_TN",
        node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
        production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
    ),
    EmbedNodeQuantFlowQuant(
        node_role="CONCESSION OF PRODUCTION",
        node_quant_name="SMALLHOLDER_PRODUCTION",
        production_node_quant_name="PALM_OIL_TN",
        node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
        production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
    ),
    EmbedNodeQuantFlowQuant(
        node_role="CONCESSION OF PRODUCTION",
        node_quant_name="PALM_OIL_DEFORESTATION_ANNUAL",
        flow_quant_name="PALM_OIL_DEFORESTATION_ANNUAL_EXPOSURE",
        production_node_quant_name="PALM_OIL_TN",
        node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
        production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
    ),
    EmbedNodeQuantFlowQuant(
        node_role="CONCESSION OF PRODUCTION",
        node_quant_name="PALM_OIL_DEFORESTATION_TOTAL",
        flow_quant_name="PALM_OIL_DEFORESTATION_TOTAL_EXPOSURE",
        production_node_quant_name="PALM_OIL_TN",
        node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
        production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
    ),
    EmbedNodeQuantFlowQuant(
        node_role="CONCESSION OF PRODUCTION",
        node_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_ANNUAL",
        flow_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_ANNUAL_EXPOSURE",
        production_node_quant_name="PALM_OIL_TN",
        node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
        production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
    ),
    EmbedNodeQuantFlowQuant(
        node_role="CONCESSION OF PRODUCTION",
        node_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_TOTAL",
        flow_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_TOTAL_EXPOSURE",
        production_node_quant_name="PALM_OIL_TN",
        node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
        production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
    ),
]
ZDC_REFERENCE_TITLE = "INDONESIA PALM OIL ZERO DEFORESTATION COMMITMENTS V3"
INDICATORS_REF_ID = get_node_attributes_reference_id(
    "INDONESIA PALM OIL INDICATORS V4 FULL"
)


@uses_database
def get_mill_groups(cur=None):
    df_mills = get_pandas_df("indonesia/companies/MILL_GROUP_OWNERSHIP.csv").fillna(
        "NA"
    )

    add_trader_label_nodes_to_cache(list(df_mills["group"]), cur=cur)
    add_all_nodes_in_country_to_cache(get_country_id("INDONESIA", cur=cur), cur=cur)

    df_mills["group"] = df_mills["group"].mask(
        df_mills["group"] == "NA", "UNKNOWN AFFILIATION"
    )

    df_mills["time_start"] = (
        df_mills["time_start"].mask(df_mills["time_start"] == "NA", 2018).astype(int)
    )
    df_mills["time_end"] = (
        df_mills["time_end"].mask(df_mills["time_end"] == "NA", 2021).astype(int)
    )

    mill_groups = {
        2018: {
            find_node_by_trase_id("ID-PALM-MILL-X"): find_group_by_name(
                "UNKNOWN", cur=cur
            )
        },
        2019: {
            find_node_by_trase_id("ID-PALM-MILL-X"): find_group_by_name(
                "UNKNOWN", cur=cur
            )
        },
        2020: {
            find_node_by_trase_id("ID-PALM-MILL-X"): find_group_by_name(
                "UNKNOWN", cur=cur
            )
        },
    }
    for _, row in df_mills.iterrows():
        for year in range(row["time_start"], row["time_end"]):
            mill_groups[year].update(
                {
                    find_node_by_trase_id(
                        row["mill_id"].replace("M-", "ID-PALM-MILL-"), cur=cur
                    ): get_trader_group_id(find_label(row["group"], cur=cur), cur=cur)
                }
            )

    return mill_groups


@uses_database
def get_refinery_groups(cur=None):
    df_refineries = get_pandas_df(
        "indonesia/logistics/out/refineries/REFS_14112021.csv"
    ).fillna("UNKNOWN")
    df_refineries = df_refineries[df_refineries["trase_id"] != "UNKNOWN"]

    add_trader_label_nodes_to_cache(list(df_refineries["comp_name"]), cur=cur)
    add_all_nodes_in_country_to_cache(get_country_id("INDONESIA", cur=cur), cur=cur)

    refinery_groups = {
        find_node_by_trase_id(
            row["trase_id"].replace("R-", "ID-PALM-REFINERY-"), cur=cur
        ): get_trader_group_id(find_label(row["comp_name"].upper(), cur=cur), cur=cur)
        for _, row in df_refineries.iterrows()
    }

    refinery_groups.update(
        {
            find_node_by_trase_id("ID-PALM-REFINERY-X"): find_group_by_name(
                "UNKNOWN", cur=cur
            )
        }
    )

    refinery_groups.update(
        {
            find_node_by_trase_id("ID-PALM-REFINERY-0000"): find_group_by_name(
                "NOT REFINED"
            )
        }
    )

    return refinery_groups


MILL_GROUPS = get_mill_groups(CUR)
REFINERY_GROUPS = get_refinery_groups(CUR)


@parallel_enabled
def process(supply_chain, new_ref_id, exports_ref_id):
    cur = supply_chain.cur
    cnx = cur.connection

    supply_chain, traders_hierarchy = fix_supply_chain_traders(
        supply_chain, random=True
    )
    supply_chain = remove_supply_chain_labels(supply_chain)
    supply_chain = add_supply_chain_parent_region(
        supply_chain, "CONCESSION OF PRODUCTION", "KABUPATEN OF PRODUCTION", "KABUPATEN"
    )
    supply_chain = add_supply_chain_parent_region(
        supply_chain,
        child_role="KABUPATEN OF PRODUCTION",
        parent_region_role="PROVINCE OF PRODUCTION",
        parent_region_level_name="PROVINCE",
    )
    supply_chain = embed(
        supply_chain=supply_chain, embedding_parameters_list=EMBEDDING_PARAMETERS_LIST
    )
    supply_chain = embed_trader_zdcs(supply_chain, ZDC_REFERENCE_TITLE)

    supply_chain = remove_supply_chain_nodes_by_role(
        supply_chain, ["CONCESSION OF PRODUCTION"]
    )
    supply_chain = SupplyChain(supply_chain.flows, cur=cur)

    unknown_mill = get_node(
        find_node_by_trase_id("ID-PALM-MILL-X", cur=cur), "MILL", cur=cur
    )
    unknown_mill_group = get_node(
        find_group_by_name("UNKNOWN", cur=cur), "MILL GROUP", cur=cur
    )
    unknown_refinery = get_node(
        find_node_by_trase_id("ID-PALM-REFINERY-X", cur=cur), "REFINERY", cur=cur
    )
    not_refined_refinery = get_node(
        find_node_by_trase_id("ID-PALM-REFINERY-0000", cur=cur), "REFINERY", cur=cur
    )
    # Anonymize flows without mill-trader link and domestic flows
    for flow in supply_chain.flows:
        if "LP 2" in flow.get_qual("DECISION TREE"):
            flow.path[3] = unknown_mill
            flow.path[4] = (
                unknown_refinery
                if flow.commodity_end.name == "REFINED PALM OIL"
                else not_refined_refinery
            )

    supply_chain = consolidate(supply_chain)

    exporter_ids = sorted(
        set(
            [
                flow.path[supply_chain.node_roles.index("EXPORTER")].id
                for flow in supply_chain.flows
                if flow.path[supply_chain.node_roles.index("EXPORTER")].id is not None
            ]
        )
    )
    exports = get_supply_chain(
        exports_ref_id,
        year=supply_chain.time_start.year,
        include_nodes=exporter_ids,
        cur=cur,
    )

    flows = []
    for exporter_id in exporter_ids:
        subchain = supply_chain.get_subchain(exporter_id)

        if exporter_id != 0:
            subchain, leftover, _ = stitch(
                subchain, exports, no_commodity_transformation=True, cur=cur
            )
        else:
            domestic_nodes = [
                get_node(0, "IMPORTER", cur=cur),
                get_node(0, "IMPORTER GROUP", cur=cur),
                get_node(
                    get_country_id("INDONESIA", cur=cur),
                    "COUNTRY OF FIRST IMPORT",
                    cur=cur,
                ),
            ]
            for flow in subchain:
                flow.append(domestic_nodes)
            subchain = SupplyChain(subchain.flows, cur=cur)

        flows += subchain.flows

    supply_chain = SupplyChain(flows, cur=cur)

    # Add mill group
    for flow in supply_chain.flows:
        mill_id = flow.path[3].id
        year = flow.time_start.year
        mill_group_id = MILL_GROUPS[year][mill_id]
        mill_group = get_node(mill_group_id, node_role="MILL GROUP", cur=cur)
        flow.insert(mill_group, position=4, cur=cur)

    # Add refinery group
    for flow in supply_chain.flows:
        refinery_id = flow.path[5].id
        refinery_group_id = REFINERY_GROUPS[refinery_id]
        refinery_group = get_node(
            refinery_group_id, node_role="REFINERY GROUP", cur=cur
        )
        flow.insert(refinery_group, position=6, cur=cur)

    supply_chain = SupplyChain(supply_chain.flows, cur=cur)

    supply_chain = add_economic_bloc(supply_chain)

    production_position = supply_chain.node_roles.index("COUNTRY OF PRODUCTION")
    destination_position = supply_chain.node_roles.index("COUNTRY OF FIRST IMPORT")
    exporter_position = supply_chain.node_roles.index("EXPORTER")
    exporter_group_position = supply_chain.node_roles.index("EXPORTER GROUP")
    port_position = supply_chain.node_roles.index("PORT OF EXPORT")
    importer_position = supply_chain.node_roles.index("IMPORTER")
    importer_group_position = supply_chain.node_roles.index("IMPORTER GROUP")

    domestic_exporter = get_node(
        find_trader_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
        "EXPORTER",
        cur=cur,
    )
    domestic_exporter_group = get_node(
        find_group_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
        "EXPORTER GROUP",
        cur=cur,
    )
    domestic_port = get_node(
        check_port(
            "DOMESTIC PROCESSING AND CONSUMPTION",
            parent_id=get_country_id("INDONESIA", cur=cur),
            strict=True,
            cnx=cnx,
            cur=cur,
        ),
        "PORT OF EXPORT",
        cur=cur,
    )
    domestic_importer = get_node(
        find_trader_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
        "IMPORTER",
        cur=cur,
    )
    domestic_importer_group = get_node(
        find_group_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
        "IMPORTER GROUP",
        cur=cur,
    )

    for flow in supply_chain.flows:
        if flow.path[production_position].id == flow.path[destination_position].id:
            flow.path[exporter_position] = domestic_exporter
            flow.path[exporter_group_position] = domestic_exporter_group
            flow.path[port_position] = domestic_port
            flow.path[importer_position] = domestic_importer
            flow.path[importer_group_position] = domestic_importer_group

    supply_chain.to_db(new_ref_id)
    traders_hierarchy.to_db(new_ref_id)


def main(only_year=None, n_threads=None, chunk_size=25001):
    ref_id = get_flows_reference_id("SEI-PCS INDONESIA PALM OIL V1.2.0 RAW")
    new_ref_id = insert_child_flows_reference(
        parent_ref_id=ref_id,
        version="1.2.1",
        title_suffix="TEST SINGLE SCRIPT RUNBOOK",
        version_type=None,
    )
    delete_dataset(new_ref_id, year=only_year)

    exports_ref_id = get_flows_reference_id(
        "SEI-PCS INDONESIA PALM OIL V1.2.1 EXPORTS FULL"
    )

    years = [only_year] if only_year is not None else get_flows_dataset_years(ref_id)
    for year in tqdm(years, "Creating full dataset"):
        c = 0
        while True:
            supply_chain = get_supply_chain(
                ref_id, year=year, limit=chunk_size, offset=(c * chunk_size)
            )
            if len(supply_chain.flows) == 0:
                break
            run_parallel(
                process,
                supply_chain,
                new_ref_id,
                exports_ref_id,
                n_threads=n_threads,
                chunks_by="EXPORTER",
            )
            c += 1


#         calculate_percentage_traded_under_zdc(
#             ref_id=ref_id,
#             indicators_ref_id=INDICATORS_REF_ID,
#             node_role="PROVINCE OF PRODUCTION",
#             only_year=year,
#         )
#         calculate_percentage_traded_under_zdc(
#             ref_id=ref_id,
#             indicators_ref_id=INDICATORS_REF_ID,
#             node_role="KABUPATEN OF PRODUCTION",
#             only_year=year,
#         )


if __name__ == "__main__":
    main(2019)