NewRunbook
View or edit on GitHub
This page is synchronized from trase/models/indonesia/palm_oil/NewRunbook.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin.
Please view or edit the original file there; changes should be reflected here after a midnight build (CET time),
or manually triggering it with a GitHub action (link).
from tqdm import tqdm
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers import get_pandas_df
EMBEDDING_PARAMETERS_LIST = [
EmbedNodeIndFlowInd(
node_role="EXPORTER",
node_ind_name="FOREST_500_PALM_OIL",
node_ind_reference="TRADERS INDICATORS V3 FULL",
),
EmbedNodeQuantFlowQuant(
node_role="CONCESSION OF PRODUCTION",
node_quant_name="OIL_PALM_HA",
flow_quant_name="LAND_USE",
production_node_quant_name="PALM_OIL_TN",
node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
),
EmbedNodeQuantFlowQuant(
node_role="CONCESSION OF PRODUCTION",
node_quant_name="SMALLHOLDER_PRODUCTION",
production_node_quant_name="PALM_OIL_TN",
node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
),
EmbedNodeQuantFlowQuant(
node_role="CONCESSION OF PRODUCTION",
node_quant_name="PALM_OIL_DEFORESTATION_ANNUAL",
flow_quant_name="PALM_OIL_DEFORESTATION_ANNUAL_EXPOSURE",
production_node_quant_name="PALM_OIL_TN",
node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
),
EmbedNodeQuantFlowQuant(
node_role="CONCESSION OF PRODUCTION",
node_quant_name="PALM_OIL_DEFORESTATION_TOTAL",
flow_quant_name="PALM_OIL_DEFORESTATION_TOTAL_EXPOSURE",
production_node_quant_name="PALM_OIL_TN",
node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
),
EmbedNodeQuantFlowQuant(
node_role="CONCESSION OF PRODUCTION",
node_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_ANNUAL",
flow_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_ANNUAL_EXPOSURE",
production_node_quant_name="PALM_OIL_TN",
node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
),
EmbedNodeQuantFlowQuant(
node_role="CONCESSION OF PRODUCTION",
node_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_TOTAL",
flow_quant_name="PALM_OIL_DEFORESTATION_10_YEAR_TOTAL_EXPOSURE",
production_node_quant_name="PALM_OIL_TN",
node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
production_node_quant_reference="INDONESIA PALM OIL INDICATORS V4 FULL",
),
]
ZDC_REFERENCE_TITLE = "INDONESIA PALM OIL ZERO DEFORESTATION COMMITMENTS V3"
INDICATORS_REF_ID = get_node_attributes_reference_id(
"INDONESIA PALM OIL INDICATORS V4 FULL"
)
@uses_database
def get_mill_groups(cur=None):
df_mills = get_pandas_df("indonesia/companies/MILL_GROUP_OWNERSHIP.csv").fillna(
"NA"
)
add_trader_label_nodes_to_cache(list(df_mills["group"]), cur=cur)
add_all_nodes_in_country_to_cache(get_country_id("INDONESIA", cur=cur), cur=cur)
df_mills["group"] = df_mills["group"].mask(
df_mills["group"] == "NA", "UNKNOWN AFFILIATION"
)
df_mills["time_start"] = (
df_mills["time_start"].mask(df_mills["time_start"] == "NA", 2018).astype(int)
)
df_mills["time_end"] = (
df_mills["time_end"].mask(df_mills["time_end"] == "NA", 2021).astype(int)
)
mill_groups = {
2018: {
find_node_by_trase_id("ID-PALM-MILL-X"): find_group_by_name(
"UNKNOWN", cur=cur
)
},
2019: {
find_node_by_trase_id("ID-PALM-MILL-X"): find_group_by_name(
"UNKNOWN", cur=cur
)
},
2020: {
find_node_by_trase_id("ID-PALM-MILL-X"): find_group_by_name(
"UNKNOWN", cur=cur
)
},
}
for _, row in df_mills.iterrows():
for year in range(row["time_start"], row["time_end"]):
mill_groups[year].update(
{
find_node_by_trase_id(
row["mill_id"].replace("M-", "ID-PALM-MILL-"), cur=cur
): get_trader_group_id(find_label(row["group"], cur=cur), cur=cur)
}
)
return mill_groups
@uses_database
def get_refinery_groups(cur=None):
df_refineries = get_pandas_df(
"indonesia/logistics/out/refineries/REFS_14112021.csv"
).fillna("UNKNOWN")
df_refineries = df_refineries[df_refineries["trase_id"] != "UNKNOWN"]
add_trader_label_nodes_to_cache(list(df_refineries["comp_name"]), cur=cur)
add_all_nodes_in_country_to_cache(get_country_id("INDONESIA", cur=cur), cur=cur)
refinery_groups = {
find_node_by_trase_id(
row["trase_id"].replace("R-", "ID-PALM-REFINERY-"), cur=cur
): get_trader_group_id(find_label(row["comp_name"].upper(), cur=cur), cur=cur)
for _, row in df_refineries.iterrows()
}
refinery_groups.update(
{
find_node_by_trase_id("ID-PALM-REFINERY-X"): find_group_by_name(
"UNKNOWN", cur=cur
)
}
)
refinery_groups.update(
{
find_node_by_trase_id("ID-PALM-REFINERY-0000"): find_group_by_name(
"NOT REFINED"
)
}
)
return refinery_groups
MILL_GROUPS = get_mill_groups(CUR)
REFINERY_GROUPS = get_refinery_groups(CUR)
@parallel_enabled
def process(supply_chain, new_ref_id, exports_ref_id):
cur = supply_chain.cur
cnx = cur.connection
exporter_label_ids = sorted(
set(
[
flow.path[supply_chain.node_roles.index("EXPORTER")].id
for flow in supply_chain.flows
]
)
)
if not None in exporter_label_ids or 0 in exporter_label_ids:
supply_chain, traders_hierarchy = fix_supply_chain_traders(
supply_chain, random=True
)
supply_chain = remove_supply_chain_labels(supply_chain)
traders_hierarchy.to_db(new_ref_id)
else:
for flow in supply_chain:
flow.path[supply_chain.node_roles.index("EXPORTER")] = get_node(
0, "EXPORTER", cur=cur
)
flow.insert(
get_node(0, "EXPORTER GROUP", cur=cur),
supply_chain.node_roles.index("EXPORTER") + 1,
)
supply_chain = SupplyChain(supply_chain.flows, cur=cur)
supply_chain = add_supply_chain_parent_region(
supply_chain, "CONCESSION OF PRODUCTION", "KABUPATEN OF PRODUCTION", "KABUPATEN"
)
supply_chain = add_supply_chain_parent_region(
supply_chain,
child_role="KABUPATEN OF PRODUCTION",
parent_region_role="PROVINCE OF PRODUCTION",
parent_region_level_name="PROVINCE",
)
supply_chain = embed(
supply_chain=supply_chain, embedding_parameters_list=EMBEDDING_PARAMETERS_LIST
)
try:
supply_chain = embed_trader_zdcs(supply_chain, ZDC_REFERENCE_TITLE)
except ValueError: # domestic flows
pass
supply_chain = remove_supply_chain_nodes_by_role(
supply_chain, ["CONCESSION OF PRODUCTION"]
)
supply_chain = SupplyChain(supply_chain.flows, cur=cur)
unknown_mill = get_node(
find_node_by_trase_id("ID-PALM-MILL-X", cur=cur), "MILL", cur=cur
)
unknown_mill_group = get_node(
find_group_by_name("UNKNOWN", cur=cur), "MILL GROUP", cur=cur
)
unknown_refinery = get_node(
find_node_by_trase_id("ID-PALM-REFINERY-X", cur=cur), "REFINERY", cur=cur
)
not_refined_refinery = get_node(
find_node_by_trase_id("ID-PALM-REFINERY-0000", cur=cur), "REFINERY", cur=cur
)
# Anonymize flows without mill-trader link and domestic flows
for flow in supply_chain.flows:
if "LP 2" in flow.get_qual("DECISION TREE"):
flow.path[3] = unknown_mill
flow.path[4] = (
unknown_refinery
if flow.commodity_end.name == "REFINED PALM OIL"
else not_refined_refinery
)
supply_chain = consolidate(supply_chain)
exporter_ids = sorted(
set(
[
flow.path[supply_chain.node_roles.index("EXPORTER")].id
for flow in supply_chain.flows
]
)
)
exports = get_supply_chain(
exports_ref_id,
year=supply_chain.time_start.year,
include_nodes=exporter_ids,
cur=cur,
)
flows = []
for exporter_id in exporter_ids:
if exporter_id is not None and exporter_id != 0:
subchain = supply_chain.get_subchain(exporter_id)
subchain, leftover, _ = stitch(
subchain, exports, no_commodity_transformation=True, cur=cur
)
else:
subchain = SupplyChain(
[
flow
for flow in supply_chain
if flow.path[supply_chain.node_roles.index("EXPORTER")].id is None
]
)
domestic_nodes = [
get_node(0, "IMPORTER", cur=cur),
get_node(0, "IMPORTER GROUP", cur=cur),
get_node(
get_country_id("INDONESIA", cur=cur),
"COUNTRY OF FIRST IMPORT",
cur=cur,
),
]
for flow in subchain:
flow.path[supply_chain.node_roles.index("EXPORTER")] = get_node(
0, "EXPORTER", cur=cur
)
flow.append(domestic_nodes)
subchain = SupplyChain(subchain.flows, cur=cur)
flows += subchain.flows
supply_chain = SupplyChain(flows, cur=cur)
# Add mill group
for flow in supply_chain.flows:
mill_id = flow.path[3].id
year = flow.time_start.year
mill_group_id = MILL_GROUPS[year][mill_id]
mill_group = get_node(mill_group_id, node_role="MILL GROUP", cur=cur)
flow.insert(mill_group, position=4, cur=cur)
# Add refinery group
for flow in supply_chain.flows:
refinery_id = flow.path[5].id
refinery_group_id = REFINERY_GROUPS[refinery_id]
refinery_group = get_node(
refinery_group_id, node_role="REFINERY GROUP", cur=cur
)
flow.insert(refinery_group, position=6, cur=cur)
supply_chain = SupplyChain(supply_chain.flows, cur=cur)
try:
supply_chain = add_economic_bloc(supply_chain)
except:
print(exporter_ids)
print(supply_chain)
print(supply_chain.flows[0].__repr__())
raise ValueError
production_position = supply_chain.node_roles.index("COUNTRY OF PRODUCTION")
destination_position = supply_chain.node_roles.index("COUNTRY OF FIRST IMPORT")
exporter_position = supply_chain.node_roles.index("EXPORTER")
exporter_group_position = supply_chain.node_roles.index("EXPORTER GROUP")
port_position = supply_chain.node_roles.index("PORT OF EXPORT")
importer_position = supply_chain.node_roles.index("IMPORTER")
importer_group_position = supply_chain.node_roles.index("IMPORTER GROUP")
domestic_exporter = get_node(
find_trader_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
"EXPORTER",
cur=cur,
)
domestic_exporter_group = get_node(
find_group_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
"EXPORTER GROUP",
cur=cur,
)
domestic_port = get_node(
check_port(
"DOMESTIC PROCESSING AND CONSUMPTION",
parent_id=get_country_id("INDONESIA", cur=cur),
strict=True,
cnx=cnx,
cur=cur,
),
"PORT OF EXPORT",
cur=cur,
)
domestic_importer = get_node(
find_trader_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
"IMPORTER",
cur=cur,
)
domestic_importer_group = get_node(
find_group_by_name("DOMESTIC PROCESSING AND CONSUMPTION", cur=cur),
"IMPORTER GROUP",
cur=cur,
)
for flow in supply_chain.flows:
if flow.path[production_position].id == flow.path[destination_position].id:
flow.path[exporter_position] = domestic_exporter
flow.path[exporter_group_position] = domestic_exporter_group
flow.path[port_position] = domestic_port
flow.path[importer_position] = domestic_importer
flow.path[importer_group_position] = domestic_importer_group
supply_chain.to_db(new_ref_id)
def main(only_year=None, n_threads=None, chunk_size=25001):
ref_id = get_flows_reference_id("SEI-PCS INDONESIA PALM OIL V1.2.0 RAW")
new_ref_id = insert_child_flows_reference(
parent_ref_id=ref_id,
version="1.2.2",
title_suffix="TEST SINGLE SCRIPT RUNBOOK",
version_type=None,
)
delete_dataset(new_ref_id, year=only_year)
exports_ref_id = get_flows_reference_id(
"SEI-PCS INDONESIA PALM OIL V1.2.1 EXPORTS FULL"
)
years = [only_year] if only_year is not None else get_flows_dataset_years(ref_id)
for year in tqdm(years, "Creating full dataset"):
c = 0
while True:
supply_chain = get_supply_chain(
ref_id, year=year, limit=chunk_size, offset=(c * chunk_size)
)
if len(supply_chain.flows) == 0:
break
run_parallel(
process,
supply_chain,
new_ref_id,
exports_ref_id,
n_threads=n_threads,
chunks_by="EXPORTER",
)
c += 1
# calculate_percentage_traded_under_zdc(
# ref_id=ref_id,
# indicators_ref_id=INDICATORS_REF_ID,
# node_role="PROVINCE OF PRODUCTION",
# only_year=year,
# )
# calculate_percentage_traded_under_zdc(
# ref_id=ref_id,
# indicators_ref_id=INDICATORS_REF_ID,
# node_role="KABUPATEN OF PRODUCTION",
# only_year=year,
# )
if __name__ == "__main__":
main()