View or edit on GitHub
This page is synchronized from trase/models/brazil/customs_2019/estimate_model_performance.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin.
Please view or edit the original file there; changes should be reflected here after a midnight build (CET time),
or manually triggering it with a GitHub action (link).
How well does the solver perform in 2015, when it does succeed?
import math
import numpy as np
import pandas as pd
import scipy.spatial.distance
from trase.models.brazil.customs_2019.constants import BUCKETS, UNKNOWNS
from trase.tools import sps
df_actual = pd.read_csv(
"/tmp/customs_declaration.csv", dtype=str, sep=";", na_filter=False
).astype({"vol": float})
file = "results_02.csv"
df_solved = pd.read_csv(
f"2015/results/{file}", dtype=str, sep=";", na_filter=False
).astype({"vol": float})
columns = [
"country_of_destination.name",
"exporter.municipality.trase_id",
"hs4",
"hs6",
# "hs8",
# "month",
"port.name",
"state.trase_id",
"via",
]
###################################################
# Sucess rate
print("------------------------------------------")
###################################################
print("How much volume successfully solved in 2015")
sps.print_report_by_attribute(df_solved, "vol", ["success"])
print("What caused the failures?")
sps.print_report_by_attribute(df_solved[df_solved["success"] == "False"], "vol", ["message"])
# now only keep solved flows (pratically speaking, a bucket will either be solved OR unsolved)
df_solved = df_solved[df_solved.pop("success") == "True"]
print("Note: from now on all figures relate to solved volume only!")
# only keep HS4 + country that we solved for
solved_buckets = df_solved[BUCKETS].drop_duplicates()
print(solved_buckets)
df_actual = pd.merge(df_actual, solved_buckets)
# match datasets
df = pd.merge(
sps.consolidate(df_actual, ["vol"], columns),
sps.consolidate(df_solved, ["vol"], columns),
on=columns,
how="outer",
suffixes=("_actual", "_solved"),
)
df = df.fillna({"vol_actual": 0, "vol_solved": 0})
actual, solved = df["vol_actual"], df["vol_solved"]
total_volume = actual.sum()
print(total_volume, solved.sum())
# assert np.isclose(total_volume, solved.sum())
assert all(solved >= 0) and all(actual >= 0)
###################################################
# report of unknown flows
print("------------------------------------------")
###################################################
def unknowns(row):
return "".join(sorted([
(column + " ") if value == UNKNOWNS[column] else ''
for column, value in row.iteritems()
if column in UNKNOWNS
])) or "(none)"
d = df.assign(unknowns=df.apply(unknowns, axis=1))
print("Volume that flows through unknowns")
print("Actual")
sps.print_report_by_attribute(d, "vol_actual", ["unknowns"])
print("Solved")
sps.print_report_by_attribute(d, "vol_solved", ["unknowns"])
###################################################
# compare states
print("------------------------------------------")
###################################################
print("Volume where state is known and equal")
known = df["state.trase_id"] != UNKNOWNS["state.trase_id"]
equal = df["state.trase_id"] == df["exporter.municipality.trase_id"].str.slice(0, 5)
mask = known & equal
d = df[mask]
print(f'Actual: {100 * d["vol_actual"].sum() / total_volume:.2f}%')
print(f'Solved: {100 * d["vol_solved"].sum() / total_volume:.2f}%')
###################################################
# overall L1, L2, cosine norm
print("------------------------------------------")
###################################################
l1_norm = np.linalg.norm(actual - solved, ord=1)
print(f"L1 norm: {sps.format_float(l1_norm, 3)} ({100 * l1_norm / total_volume:.2f}%)")
l2_norm = np.linalg.norm(actual - solved, ord=2)
print(f"L2 norm: {sps.format_float(l2_norm, 3)} ({100 * l2_norm / total_volume:.2f}%)")
cosine = scipy.spatial.distance.cosine(actual, solved)
print(f"Cosine distance: {cosine:.2f} rad ({math.degrees(cosine):.2f}°)")
###################################################
# relative error
print("------------------------------------------")
###################################################
relative_error = (solved - actual) / (solved + actual)
correctly_zero = (solved <= 1e-1) & (actual <= 1e-1)
relative_error = relative_error.mask(correctly_zero, 0)
ax = (100 * np.abs(relative_error)).hist(
bins=10, weights=100 * (solved + actual) / total_volume / 2
)
ax.set_title(f"Accuracy of MDIC disaggregation\n{file}")
ax.set_xlabel("Relative error: 100 * |b - a| / (b + a)")
ax.set_ylabel("Percentage of total volume")
For how much volume does the solver succeed in 2019?
Overall success rate in 2019:
df_2019 = pd.read_csv(
"2019/results/results.csv", dtype=str, sep=";", na_filter=False
).astype({"vol": float})
sps.print_report_by_attribute(df_2019, "vol", ["success"])
Why the failures occur:
sps.print_report_by_attribute(df_2019[df_2019["success"] == "False"], "vol", ["success", "message"])
Looking at solver results for individual buckets (country + HS4)
# read BoL
df_bol = pd.read_csv(
"2015/prepared/bill_of_lading.csv", dtype=str, sep=";", na_filter=False
).astype({"vol": float})
# categorise buckets by L2 error
df_norms = df.groupby(BUCKETS).apply(lambda df: np.linalg.norm(df["vol_actual"] - df["vol_solved"], ord=2)).rename("error").sort_values()
# pick a random high-error bucket
random_bucket = df_norms.sample(weights=df_norms)
# filter data
def filter(df):
return pd.merge(df, random_bucket, left_on=BUCKETS, right_index=True)
df_ = filter(df)
df_bol_ = filter(df_bol)
bol_columns = ['country_of_destination.name', 'via', 'port.name', 'hs6', 'hs4',
'exporter.municipality.trase_id']
df2_ = sps.consolidate(df_, ["vol_actual", "vol_solved"], bol_columns)
# assign any old number for the x-axis
df_["x"] = [f"v{i}" for i in range(len(df_))]
df_vs = pd.concat([df2_, df_bol_[bol_columns]], sort=False).drop_duplicates()
df_vs["x"] = [f"w{i}" for i in range(len(df_vs))]
def assign(df):
df = pd.merge(df, df_vs, on=bol_columns, indicator=True, how="left")
assert all(df.pop("_merge") == "both")
return df
df2_ = assign(df2_)
df_bol_ = assign(df_bol_)
# drop zero rows
df_ = df_[(df_["vol_solved"] > 0) & (df_["vol_actual"] > 0e)
# drop rows where both
data = pd.concat(
[
# filter(df_bol).assign(source="bol"),
filter(df.assign(i=range(len(df)))).drop(columns="vol_actual").rename(columns={"vol_solved": "vol"}).assign(source="solved"),
filter(df.assign(i=range(len(df)))).drop(columns="vol_solved").rename(columns={"vol_actual": "vol"}).assign(source="actual"),
],
sort=False,
)
import plotly.express as px
data["i"] = "foo" + data["i"].astype(str)
px.bar(
data,
y="vol",
x="i",
color="source",
barmode="group",
)
# TODO compare Bol of solution with solved & actual consolidated over BOL columns
df_.columns
df_bol_.columns
df["port.name"] + " - " + df["country_of_destination.name"] + "-" + df["via"]
hs4
help(sps.print_report_by_attribute)