Skip to content

Post processing

View or edit on GitHub

This page is synchronized from trase/models/brazil/soy/post_processing.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin. Please view or edit the original file there; changes should be reflected here after a midnight build (CET time), or manually triggering it with a GitHub action (link).

Post-Processing

This notebook is a proof-of-concept showing how we can post-process the SEI-PCS model results without requiring an ingest into the database. Specifically:

  • Cleaning all columns containing node labels (country, port) (except trader columns, which has not yet been implemented)
  • Checking that all nodes in the file exist in the database
  • Adding parent columns (biome, state, economic bloc)

Approach used

When we look up nodes in the database, we have to compare (a) data in memory, in the form of a Pandas dataframe, to (b) tables in the PostgreSQL database. This either requires us loading all candidate data from the database into the Python process, or loading the lookup data from Python into PostgreSQL. Since the search space is in general quite large (we have hundreds of thousands of traders) and fetching data from the database is relatively slow, we opt to first insert the lookup data into a temporary table in PostgreSQL. This also allows us to use optimized SQL queries, and the compute available on the server, to perform the actual lookup.

alt text

Not yet implemented

  • Cleaning trader names and adding trader groups
  • Applying equivalence factors

Possible future improvements

  • Using the ingest metadata to generate the cleaning steps. Some initial work in this direction has been done in Initial add of new ingest code #3441.
  • Stream entire dataframe to database, perform all lookups in database, and then stream data back; rather than doing this column-by-column. This may be faster but also might result in more complex code.
  • De-duplicate before performing lookup, which should be faster but would result in more complex code
import pandas as pd
from psycopg2 import sql

from trase.tools import sps
from trase.tools.pcs import (
    get_country_id,
    get_qual_id,
    get_node_attributes_reference_id,
)
from trase.tools.pandasdb.find import find_nodes_by_name, find_nodes_by_trase_id
from trase.tools.pandasdb.query import query_with_dataframe

df = sps.get_pandas_df_once(
    "brazil/soy/sei_pcs/v2.6.0/SEIPCS_BRAZIL_SOY_2016.csv",
    dtype=str,
    sep=";",
    na_filter=False,
).astype({"YEAR": int, "VOLUME_RAW": float, "VOLUME_PRODUCT": float, "FOB": float})

Step 1: Clean/validate COUNTRY_OF_FIRST_IMPORT and get Node IDs for later

df[["COUNTRY_OF_FIRST_IMPORT_CLEAN", "COUNTRY_OF_FIRST_IMPORT_ID"]] = (
    find_nodes_by_name(
        df,
        returning=["default_name", "node_id"],
        name=sql.Identifier("COUNTRY_OF_FIRST_IMPORT"),
        level=sql.Literal(1),
        sub_type_id=sql.Literal(8),
        year=sql.Identifier("YEAR"),
        on_extra_columns="ignore",
    )
)
assert not df["COUNTRY_OF_FIRST_IMPORT_CLEAN"].isna().any()

Step 2: Clean/validate COUNTRY_SYNACOMEX

df[["COUNTRY_SYNACOMEX_CLEAN"]] = find_nodes_by_name(
    df,
    returning=["default_name"],
    name=sql.Identifier("COUNTRY_SYNACOMEX"),
    level=sql.Literal(1),
    sub_type_id=sql.Literal(8),
    year=sql.Identifier("YEAR"),
    on_extra_columns="ignore",
)
assert not df["COUNTRY_OF_FIRST_IMPORT_CLEAN"].isna().any()

Step 3: Clean/validate port

brazil_id = get_country_id("BRAZIL")
df[["PORT_OF_EXPORT_CLEAN"]] = find_nodes_by_name(
    df,
    returning=["default_name"],
    name=sql.Identifier("PORT_OF_EXPORT"),
    parent_id=sql.Literal(brazil_id),  # 27 = Brazil
    sub_type_id=sql.Literal(10),
    year=sql.Identifier("YEAR"),
    on_extra_columns="ignore",
)

# not sure how to handle NONE port here...
assert all(~df["PORT_OF_EXPORT_CLEAN"].isna() | (df["PORT_OF_EXPORT"] == "NONE"))

Step 4: Validate MUNICIPALITY OF PRODUCTION Trase IDs and fetch node IDs for later

df[["LVL6_NODE_ID_PROD"]] = find_nodes_by_trase_id(
    df[["LVL6_TRASE_ID_PROD"]],
    returning=["node_id"],
    trase_id=sql.Identifier("LVL6_TRASE_ID_PROD"),
)
assert not df["LVL6_NODE_ID_PROD"].isna().any()

Step 4: Validate LOGISTICS HUB Trase IDs

number_of_matches = find_nodes_by_trase_id(
    df[["LVL6_TRASE_ID_LH"]],
    returning=["count"],
    trase_id=sql.Identifier("LVL6_TRASE_ID_LH"),
)
assert all(n == (1,) for n in number_of_matches)

Step 5: Insert STATE OF PRODUCTION

df[["LVL3_TRASE_ID_PROD", "LVL3_NAME_PROD"]] = query_with_dataframe(
    df["LVL6_NODE_ID_PROD"].rename("id"),
    """
    select name, n3.trase_id
    from (
        select row_number() over (), * from df
    ) data
    left join main.nodes n0 on data.id = n0.id
    left join main.nodes n1 on n0.parent_id = n1.id
    left join main.nodes n2 on n1.parent_id = n2.id
    left join main.nodes n3 on n2.parent_id = n3.id    
    left join main.node_names on n3.id = node_id
    where is_default
    order by row_number
    """,
)
assert not df["LVL3_NODE_ID_PROD"].isna().any()

Step 6: Insert economic bloc

economic_bloc_qual_id = get_qual_id("ECONOMIC BLOC")
economic_bloc = query_with_dataframe(
    df["COUNTRY_OF_FIRST_IMPORT_ID"].rename("node_id"),
    f"""
    select data.node_id, value
    from ( 
        select row_number() over (), * from df 
    ) data
    left join ( 
        select node_id, value
        from node_quals 
        where qual_id = {economic_bloc_qual_id}
    ) blocs using (node_id)
    order by row_number
    """,
)
assert len(economic_bloc) == len(df)

# default to country name when economic bloc not found
df["ECONOMIC_BLOC"] = economic_bloc["value"].combine_first(
    df["COUNTRY_OF_FIRST_IMPORT_CLEAN"]
)

Step 7: Insert biome

TODO: also take into account year

qual_id = get_qual_id("BIOME")
ref_id = get_node_attributes_reference_id("BRAZIL INDICATORS V3 FULL")
biomes = query_with_dataframe(
    df["LVL6_NODE_ID_PROD"].rename("node_id"),
    f"""
    select value from (
        select row_number() over (), * from df
    ) data
    left join (
        select node_id, value
        from node_quals
        where qual_id = {qual_id} and ref_id = {ref_id}
    ) biomes using (node_id)
    order by row_number
    """,
)["value"]
assert not biomes.isnull().any()
assert len(df) == len(biomes)
df["BIOME"] = biomes

Closing out: demonstrate sample rows

df.sample(3).T