Skip to content

QA

View or edit on GitHub

This page is synchronized from trase/models/brazil/corn/QA.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin. Please view or edit the original file there; changes should be reflected here after a midnight build (CET time), or manually triggering it with a GitHub action (link).

Brazil soy 2017 - Quality assurance

This notebook compares the results of the new model (based on bills of lading and MDIC port) and old model (based on customs declarations combined with full MDIC) for 2017.

As countries of destination can be different for the same shipment in customs declarations and bills of lading, comparisons are made per CNPJ-port-product, by looking at which logistics hubs were found by the models.

import pandas as pd
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.pcs import *

old = (
    get_pandas_df(
        "brazil/soy/sei_pcs/v2.5.2/SEIPCS_BRAZIL_SOY_2017_old_transportation_matrix.csv"
    )
    .astype({"EXPORTER_TAX_ID": str, "PRODUCT": str})
    .rename(
        columns={
            "EXPORTER_TAX_ID": "CNPJ",
            "COUNTRY_OF_DESTINATION": "COUNTRY",
            "PORT_OF_EXPORT": "PORT",
        }
    )
)

# load data from s3 to make sure latest results are used (latest run may not have been on deforestationfree)
new = (
    get_pandas_df(
        "brazil/soy/sei_pcs/v2.6.0/SEIPCS_BRAZIL_SOY_2017_stickiness_50.csv"
        # "brazil/soy/sei_pcs/v2.6.0/SEIPCS_BRAZIL_SOY_2017_stickiness_50_&_SG_CN.csv"
        # "brazil/soy/sei_pcs/v2.6.0/SEIPCS_BRAZIL_SOY_2017_stickiness_50_&_SG_CN_&_JP_CN_&_KR_CN_&_ID_CN.csv"
    )
    .astype({"CNPJ": str, "PRODUCT": str})
    .rename(columns={"COUNTRY_OF_FIRST_IMPORT": "COUNTRY", "PORT_OF_EXPORT": "PORT"})
)

old["PRODUCT"] = old["PRODUCT"].apply(
    lambda hs_code: {
        "230400": "SOYBEAN CAKE",
        "120190": "SOYBEANS",
        "150710": "SOYBEAN OIL",
    }[hs_code]
)
old["COUNTRY"] = old["COUNTRY"].apply(
    lambda country_name: get_node_name(get_country_id(country_name))
)

cnpj_tax_muns = (
    old[["CNPJ", "EXPORTER_GEOCODE"]]
    .drop_duplicates()
    .rename(columns={"EXPORTER_GEOCODE": "TAX_MUN"})
)

new_cnpj_port_volumes = (
    new[["CNPJ", "PORT", "PRODUCT", "VOLUME_RAW"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["VOLUME_RAW"]
    .sum()
    .reset_index()
)

old_cnpj_port_countries = (
    old[["CNPJ", "PORT", "COUNTRY", "PRODUCT"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["COUNTRY"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"COUNTRY": "OLD_COUNTRIES"})
)

old_cnpj_port_hubs = (
    old[["CNPJ", "PORT", "LVL6_TRASE_ID_LH", "PRODUCT"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["LVL6_TRASE_ID_LH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"LVL6_TRASE_ID_LH": "OLD_HUBS"})
)

old_cnpj_port_branches = (
    old[["CNPJ", "PORT", "BRANCH", "PRODUCT"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["BRANCH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"BRANCH": "OLD_BRANCHES"})
)

new_cnpj_port_countries = (
    new[["CNPJ", "PORT", "COUNTRY", "PRODUCT"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["COUNTRY"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"COUNTRY": "NEW_COUNTRIES"})
)

new_cnpj_port_hubs = (
    new[["CNPJ", "PORT", "LVL6_TRASE_ID_LH", "PRODUCT"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["LVL6_TRASE_ID_LH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"LVL6_TRASE_ID_LH": "NEW_HUBS"})
)

new_cnpj_port_branches = (
    new[["CNPJ", "PORT", "BRANCH", "PRODUCT"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT"])["BRANCH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"BRANCH": "NEW_BRANCHES"})
)

df = pd.merge(old_cnpj_port_hubs, new_cnpj_port_hubs, on=["CNPJ", "PORT", "PRODUCT"])
df = pd.merge(df, old_cnpj_port_branches, on=["CNPJ", "PORT", "PRODUCT"])
df = pd.merge(df, new_cnpj_port_branches, on=["CNPJ", "PORT", "PRODUCT"])
df = pd.merge(df, old_cnpj_port_countries, on=["CNPJ", "PORT", "PRODUCT"])
df = pd.merge(df, new_cnpj_port_countries, on=["CNPJ", "PORT", "PRODUCT"])
df = pd.merge(df, cnpj_tax_muns, on="CNPJ")

df.to_csv("df.csv")

CNPJ-port-product flows are classified into different categories based on the differences between models.

The main criterion to assign a confidence level to the new model for a CNPJ-port-product is whether the countries are consistent between the customs declarations and bills of lading.

Given that the customs declarations include more shipments than bills of lading, we consider that the countries are matching when all the countries of destination of the bills of lading are present in the customs declarations.

The reason inconsistent countries reduce the confidence level is that the new model is heavily constrained by MDIC volumes between state-port-country, and that the countries in MDIC port match the countries in customs declarations, but may be different from the countries in the bills of lading.

status = {}

for _, row in df.iterrows():
    cnpj = row["CNPJ"]
    port = row["PORT"]
    product = row["PRODUCT"]

    old_hubs = row["OLD_HUBS"]
    old_branches = row["OLD_BRANCHES"]
    old_countries = row["OLD_COUNTRIES"]
    new_hubs = row["NEW_HUBS"]
    new_branches = row["NEW_BRANCHES"]
    new_countries = row["NEW_COUNTRIES"]

    same_countries = all([country in old_countries for country in new_countries])
    same_states = all([hub[:5] in [hub[:5] for hub in old_hubs] for hub in new_hubs])

    # --------------------------------------------------------------
    # fully explained cases: no need to investigate

    if old_hubs == new_hubs:
        status[(cnpj, port, product)] = "SAME HUBS"
        continue

    if new_hubs == ["BR-XXXXXXX"]:
        status[(cnpj, port, product)] = "NEW UNKNOWN"
        continue

    if same_countries:
        # tax municipality was accepted as a hub in old model
        # but the state-port-country-product volume exceeded MDIC,
        # so sourcing has to come from other states
        if (
            "1.1" in old_branches
            and "1.1 - farm/silo" in new_branches
            and not same_states
        ) or (
            "2.1" in old_branches
            and "1.1 - farm/silo" in new_branches
            and not same_states
        ):
            status[(cnpj, port, product)] = "FARM/SILO EXCEEDED MDIC"
            continue

        # tax municipality was accepted as a hub in old model but tax municipality
        # was not in the state of production
        if (
            "1.2" in old_branches
            or "2.2" in old_branches
            or ("3.1" in old_branches and not same_states)
        ):
            status[(cnpj, port, product)] = "OLD HUB IGNORED STATE OF PRODUCTION"
            continue

        # new model split flows between all possible states according to MDIC
        # but old model had single state
        if "3.2" in old_branches and not same_states:
            status[(cnpj, port, product)] = (
                "OLD HUB KNEW RIGHT STATE AND NEW MODEL SPLIT"
            )
            continue

        if "1.2 - large exporter" in new_branches:
            status[(cnpj, port, product)] = "NEW LARGE EXPORTER"
            continue

        if "1.3 - large producer" in new_branches:
            status[(cnpj, port, product)] = "NEW LARGE PRODUCER"
            continue

        if "3.6" in old_branches or "4.2" in old_branches:
            status[(cnpj, port, product)] = "OLD UNKNOWN"
            continue

        # old model and new model found different hubs
        if (
            "3.3.1" in old_branches
            or "3.3.2" in old_branches
            or "3.3.3" in old_branches
        ):
            if same_states:
                status[(cnpj, port, product)] = "DIFFERENT HUBS IN SAME STATE"
                continue
            else:
                status[(cnpj, port, product)] = "DIFFERENT HUBS IN DIFFERENT STATE"
                # continue

    # countries attached to the CNPJ-port-product are different in old model (customs declarations)
    # and new model (bills of lading), which means that the MDIC volume constraints may be wrong
    # in the new model
    else:
        if (
            "1.1" in old_branches
            and "1.1 - farm/silo" in new_branches
            and not same_states
        ) or (
            "2.1" in old_branches
            and "1.1 - farm/silo" in new_branches
            and not same_states
        ):
            status[(cnpj, port, product)] = (
                "FARM/SILO EXCEEDED MDIC - DIFFERENT COUNTRIES"
            )
            continue

        if (
            "1.2" in old_branches
            or "2.2" in old_branches
            or ("3.1" in old_branches and not same_states)
        ):
            status[(cnpj, port, product)] = (
                "OLD HUB IGNORED STATE OF PRODUCTION - DIFFERENT COUNTRIES"
            )
            continue

        if "3.2" in old_branches and not same_states:
            status[(cnpj, port, product)] = (
                "OLD HUB KNEW RIGHT STATE AND NEW MODEL SPLIT - DIFFERENT COUNTRIES"
            )
            continue

        if "1.2 - large exporter" in new_branches:
            status[(cnpj, port, product)] = "NEW LARGE EXPORTER - DIFFERENT COUNTRIES"
            continue

        if "1.3 - large producer" in new_branches:
            status[(cnpj, port, product)] = "NEW LARGE PRODUCER - DIFFERENT COUNTRIES"
            continue

        if "3.6" in old_branches or "4.2" in old_branches:
            status[(cnpj, port, product)] = "OLD UNKNOWN - DIFFERENT COUNTRIES"
            continue

        if (
            "3.3.1" in old_branches
            or "3.3.2" in old_branches
            or "3.3.3" in old_branches
        ):
            if same_states:
                status[(cnpj, port, product)] = (
                    "DIFFERENT HUBS IN SAME STATE - DIFFERENT COUNTRIES"
                )
                continue
            else:
                status[(cnpj, port, product)] = (
                    "DIFFERENT HUBS IN DIFFERENT STATE - DIFFERENT COUNTRIES"
                )
                continue

    print(80 * "_")
    print()
    print("cnpj \t\t port \t\t product")
    print(f"{cnpj} \t {port} \t {product}")
    print()

    try:
        print(status[(cnpj, port, product)])
        print()
    except:
        pass

    if not same_countries:
        print("DIFFERENT COUNTRIES")
        print(old_countries)
        print(new_countries)
        print()

    if not same_states:
        print("DIFFERENT STATES")
        print()

    tax_mun = row["TAX_MUN"]
    print("tax mun:")
    try:
        print(
            get_node(find_node_by_trase_id(f"BR-{tax_mun}")),
            f"[BR-{tax_mun}]",
            "-",
            get_node(find_node_by_trase_id(f"BR-{tax_mun}"[:5])),
        )
    except:
        pass
    print()
    print("old hubs: branch", ", ".join(old_branches))
    for old_hub in old_hubs:
        print(
            f"{get_node(find_node_by_trase_id(old_hub))} [{old_hub}] - {get_node(find_node_by_trase_id(old_hub[:5]))}"
        )
    print()
    print("new hubs: branch", ", ".join(new_branches))
    for new_hub in new_hubs:
        print(
            f"{get_node(find_node_by_trase_id(new_hub))} [{new_hub}] - {get_node(find_node_by_trase_id(new_hub[:5]))}"
        )
    print()

Categories are combined based on confidence levels.

df_compare = pd.DataFrame(
    columns=["CNPJ", "PORT", "PRODUCT", "STATUS"],
    data=[
        (cnpj, port, product, value) for (cnpj, port, product), value in status.items()
    ],
)
df_compare = pd.merge(df_compare, new_cnpj_port_volumes, on=["CNPJ", "PORT", "PRODUCT"])

vol_sum = df_compare["VOLUME_RAW"].sum()

results = df_compare.groupby("STATUS")["VOLUME_RAW"].sum().reset_index()
results = results.sort_values("VOLUME_RAW", ascending=False)
results["VOLUME_RAW"] *= 100 / vol_sum
results["VOLUME_RAW"] = results["VOLUME_RAW"].round(1)

very_high_confidence = [
    "FARM/SILO EXCEEDED MDIC",
    "OLD HUB IGNORED STATE OF PRODUCTION",
]

high_confidence = ["SAME HUBS"]

medium_confidence = [
    "NEW LARGE EXPORTER",
    "NEW LARGE PRODUCER",
    "DIFFERENT HUBS IN SAME STATE",
]

low_confidence = [
    "OLD UNKNOWN",
    "NEW UNKNOWN",
    "FARM/SILO EXCEEDED MDIC - DIFFERENT COUNTRIES",
    "NEW LARGE PRODUCER - DIFFERENT COUNTRIES",
    "OLD HUB IGNORED STATE OF PRODUCTION - DIFFERENT COUNTRIES",
    "OLD UNKNOWN - DIFFERENT COUNTRIES",
    "NEW LARGE EXPORTER - DIFFERENT COUNTRIES",
    "DIFFERENT HUBS IN DIFFERENT STATE - DIFFERENT COUNTRIES",
    "DIFFERENT HUBS IN DIFFERENT STATE",
    "DIFFERENT HUBS IN SAME STATE - DIFFERENT COUNTRIES",
    "OLD HUB KNEW RIGHT STATE AND NEW MODEL SPLIT - DIFFERENT COUNTRIES",
    "OLD HUB KNEW RIGHT STATE AND NEW MODEL SPLIT",
]

results["CONFIDENCE"] = "4. LOW"
results["CONFIDENCE"] = results["CONFIDENCE"].mask(
    results["STATUS"].isin(very_high_confidence), "1. VERY HIGH"
)
results["CONFIDENCE"] = results["CONFIDENCE"].mask(
    results["STATUS"].isin(high_confidence), "2. HIGH"
)
results["CONFIDENCE"] = results["CONFIDENCE"].mask(
    results["STATUS"].isin(medium_confidence), "3. MEDIUM"
)

results = results.groupby("CONFIDENCE")["VOLUME_RAW"].sum().reset_index()
results = results.sort_values("CONFIDENCE")

results["VOLUME_RAW"] = results["VOLUME_RAW"].apply(int).astype(str) + "%"

results

Compare volumes between CNPJ-port-product-country to identify pairs of countries that are substituted between customs declarations and bills of lading (obvious one: SOUTH KOREA -> CHINA)

new_volumes = (
    new[["CNPJ", "PORT", "PRODUCT", "COUNTRY", "VOLUME_RAW"]][
        new["CNPJ"] != "XXXXXXXXXXXXXX"
    ]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["VOLUME_RAW"]
    .sum()
    .reset_index()
    .rename(columns={"VOLUME_RAW": "VOLUME_NEW"})
    .astype({"CNPJ": int})
    .astype({"CNPJ": str})
)

old_volumes = (
    old[["CNPJ", "PORT", "PRODUCT", "COUNTRY", "VOLUME_RAW"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["VOLUME_RAW"]
    .sum()
    .reset_index()
    .rename(columns={"VOLUME_RAW": "VOLUME_OLD"})
    .astype({"CNPJ": int})
    .astype({"CNPJ": str})
)

cnpjs_new = (
    new_volumes.groupby(["CNPJ", "PORT", "PRODUCT"])["VOLUME_NEW"].sum().reset_index()
)
cnpjs_new = cnpjs_new[cnpjs_new["VOLUME_NEW"] > 0][["CNPJ", "PORT", "PRODUCT"]]

cnpjs_old = (
    old_volumes.groupby(["CNPJ", "PORT", "PRODUCT"])["VOLUME_OLD"].sum().reset_index()
)
cnpjs_old = cnpjs_old[cnpjs_old["VOLUME_OLD"] > 0][["CNPJ", "PORT", "PRODUCT"]]

countries = pd.merge(
    new_volumes, old_volumes, on=["CNPJ", "PORT", "PRODUCT", "COUNTRY"], how="outer"
).fillna(0)
countries = pd.merge(countries, cnpjs_new, on=["CNPJ", "PORT", "PRODUCT"])
countries = pd.merge(countries, cnpjs_old, on=["CNPJ", "PORT", "PRODUCT"])

countries["DIFF"] = countries["VOLUME_OLD"] - countries["VOLUME_NEW"]
countries = countries.sort_values(["CNPJ", "PORT", "PRODUCT", "COUNTRY", "DIFF"])
countries = countries[abs(countries["DIFF"]) > 10]
countries.to_csv("countries.csv")
countries
df_compare
new_cnpj_port_country_volumes = (
    new[["CNPJ", "PORT", "PRODUCT", "COUNTRY", "VOLUME_RAW"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["VOLUME_RAW"]
    .sum()
    .reset_index()
)

old_cnpj_port_country_hubs = (
    old[["CNPJ", "PORT", "LVL6_TRASE_ID_LH", "PRODUCT", "COUNTRY"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["LVL6_TRASE_ID_LH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"LVL6_TRASE_ID_LH": "OLD_HUBS"})
)

old_cnpj_port_country_branches = (
    old[["CNPJ", "PORT", "BRANCH", "PRODUCT", "COUNTRY"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["BRANCH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"BRANCH": "OLD_BRANCHES"})
)

new_cnpj_port_country_hubs = (
    new[["CNPJ", "PORT", "LVL6_TRASE_ID_LH", "PRODUCT", "COUNTRY"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["LVL6_TRASE_ID_LH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"LVL6_TRASE_ID_LH": "NEW_HUBS"})
)

new_cnpj_port_country_branches = (
    new[["CNPJ", "PORT", "BRANCH", "PRODUCT", "COUNTRY"]]
    .drop_duplicates()
    .groupby(["CNPJ", "PORT", "PRODUCT", "COUNTRY"])["BRANCH"]
    .apply(lambda x: sorted(set(x)))
    .reset_index()
    .rename(columns={"BRANCH": "NEW_BRANCHES"})
)

df2 = pd.merge(
    old_cnpj_port_country_hubs,
    new_cnpj_port_country_hubs,
    on=["CNPJ", "PORT", "PRODUCT", "COUNTRY"],
    how="outer",
).fillna("NA")
df2 = pd.merge(
    df2,
    old_cnpj_port_country_branches,
    on=["CNPJ", "PORT", "PRODUCT", "COUNTRY"],
    how="outer",
).fillna("NA")
df2 = pd.merge(
    df2,
    new_cnpj_port_country_branches,
    on=["CNPJ", "PORT", "PRODUCT", "COUNTRY"],
    how="outer",
).fillna("NA")
df2 = pd.merge(df2, cnpj_tax_muns, on="CNPJ")

df2["QA_BRANCH"] = None
df2["QA_BRANCH"] = df2["QA_BRANCH"].mask(df2["OLD_HUBS"] == df2["NEW_HUBS"])

df2

df2.to_csv("df2.csv")