Preprocess bolivia soy exports 2018 2021.py
View or edit on GitHub
This page is synchronized from trase/models/bolivia/soy/archive/preprocess_bolivia_soy_exports_2018_2021.py.ipynb. Last modified on 2025-12-13 00:30 CET by Trase Admin.
Please view or edit the original file there; changes should be reflected here after a midnight build (CET time),
or manually triggering it with a GitHub action (link).
import re
import pandas as pd
from trase.tools import *
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sps import clean_string
UNIT_DICT = {
"TY": "TY-TANK OR CYLINDRICAL TANK",
"TB": "TB-FOOD CONTAINER",
"VL": "VL-BULK LIQUID",
"CT": "CT-CARDBOARD BOX",
"VY": "VY-SOLID / FINE PARTICLES POWDERS",
"BG": "BG-BAG, SACK OR PACKAGING",
"SA": "SA-SACO",
"VR": "VR-SOLID PARTICLES GRAINS",
}
YEAR = [2021]
def main():
for year in YEAR:
df = load_data(year)
df = column_rename(df, year)
df = preprocess(df, year)
df = drop_columns(df, year)
save_results(df, year)
def load_data(year):
"""
Load csv file(s) from S3, return pandas dataframe
"""
path = "bolivia/trade/cd/export/soy/originals/"
file_name = {
# 2018: [
# "BOLIVIA_EXPORT_1507_JAN18_DEC18.csv",
# "BOLIVIA_EXPORT_2304_JAN18_DEC18.csv",
# ],
# 2019: ["BOLIVA_EXPORT_1507_2304_JAN19_DEC19.csv"],
# 2020: [
# "BOLIVA_EXPORT_12019000000_12081000000_15071000000_15079090000_23040000000_YEAR2020.csv"
# ],
2021: ["BOLIVIA_EXPORT_120190_120810_150710_150790_230250_230400_YEAR2021.csv"],
}
df_list = []
for fn in file_name[year]:
df = get_pandas_df_once(
f"{path}{fn}",
encoding="utf8",
dtype=str,
keep_default_na=False,
)
df_list.append(df)
df = pd.concat(df_list, ignore_index=True) if len(df_list) > 1 else df_list[0]
return df
def column_rename(df, year):
"""
Rename columns based on year
"""
if year == 2018:
columns = {
"Exporter_Code": "exporter.id",
"Exporter Name": "exporter.label",
"Importer": "importer.label",
"weight ": "vol",
"Destination Country_EN": "country_of_destination.label",
"puerto": "port_of_import.label",
"country_proced": "country_proceed",
"itm_fob_us_first": "item_fob_usd_first",
"itm_fob_us_last": "item_fob_usd_last",
"total_fob_us_first": "total_fob_usd_first",
"total_fob_us_last": "total_fob_usd_last",
"taxes_first_tot_bs": "taxes_first_total_bs",
"modalidad": "modality",
"regimen ": "regime",
"canal ": "channel",
"Declarant_Name": "declarant",
"no_register": "declaration_no",
"commercial_description": "product_description",
"commercial_description_EN": "product_description_en",
"HS Code": "hs",
"enm_orig": "amend_orig",
}
else:
country_column = (
"DESTINATION_COUNTRY" if year == 2021 else "DESTINATION _COUNTRY"
)
columns = {
"EXPORTER_CODE": "exporter.id",
"EXPORTER": "exporter.label",
"WEIGHT": "vol",
country_column: "country_of_destination.label",
"PORT_OF_DISCHARGE": "port_of_import.label",
"BUYER": "importer.label",
"BUYER_ADDRESS": "importer.address",
"HS_CODE": "hs",
"ENM_ORIG": "amend_orig",
}
df = df.rename(columns=columns, errors="raise")
df = df.rename(
columns=lambda x: x.strip()
.replace(" ", "_")
.lower()
.replace("_itm_", "_item_")
.replace("_bob", "_bs"),
errors="raise",
)
return df
def preprocess(df, year):
"""
Clean date, numerical and categorical columns
"""
df = clean_numerical_columns(df)
df = clean_names(df)
df = clean_port(df)
df = clean_time(df, year)
df["hs6"] = df["hs"].str[0:6]
df["unit_en"] = df.unit.str[0:2].map(UNIT_DICT).fillna("UNKNOWN")
df.loc[df["amend_orig"] == "ENMIENDA", "amend_orig"] = "AMENDMENT"
return df
def drop_columns(df, year):
"""
Drop columns that are not needed
"""
if year == 2018:
# Check null
check_columns = ["custom_port", "item_value", "patron"]
for c in check_columns:
assert (df[c] == "").all()
columns = [
"country_of_destination.label",
"product_description_en",
"custom_port",
"item_value",
"patron",
]
else:
columns = [
"date",
"heading",
"chapter",
"sub_heading",
]
unnecessary_columns = [
"user",
"taxes_first_total_bs",
"taxes_last_total_bs",
"taxes_last_item_bs",
"taxes_first_item_bs",
]
df.drop(
columns + unnecessary_columns,
axis=1,
inplace=True,
)
return df
def save_results(df, year):
"""
Save dataframe to S3
"""
df.to_csv(
df,
f"bolivia_soy_export_{year}.csv",
sep=";",
encoding="utf8",
)
def clean_country_name(country_name):
"""
Clean country name based on database
"""
country_id = check_country(country_name)
return get_node_name(country_id)
def clean_trader_name(trader_name):
"""
Clean trader names, mostly about country names and suffix, like changing S.A. to SA
"""
trader_name = re.sub("[,./]", " ", trader_name)
trader_name = " ".join(trader_name.split())
synonyms = {
" SA": [" S A"],
" SPA": [" SP A", " S PA", " S P A"],
" SRL": [" S RL", " SR L", " S R L"],
" SAA": [" SA A"],
" SAC": [" SA C"],
" EIRL": [" E I R L"],
" & ": [" Y "],
" BOLIVIA ": [" BOLVIA "],
" AMERICAS ": [" AMEERICAS "],
" AGRITRADE ": [" AGTRITRADE ", " AGRITADE "],
"CARGILL AMERICAS INC": ["CARGILL AMERICA INC"],
}
for name, labels in synonyms.items():
for label in labels:
trader_name = trader_name.replace(label, name)
return trader_name
def clean_time(df, year):
"""
Check and Clean time columns
"""
time_columns = ["channel_assignment_date", "month", "year"]
df["channel_assignment_date"] = df["channel_assignment_date"].str.zfill(9)
if "date" in df.columns:
time_columns.append("date")
df["date"] = df["date"].str.replace("/", "-").str.zfill(11)
assure_time(df, year)
df["year"] = str(year)
df["month"] = df.channel_assignment_date.str[3:-3]
for t in time_columns:
df[t] = df[t].str.upper()
return df
def assure_time(df, year):
"""
Check whether years match in time columns
"""
if "date" in df.columns:
# print out rows with different years in column date and channel_assignment_date
if (df.date.str[-2:] != df.channel_assignment_date.str[-2:]).any():
df_mismatch = df[df.date.str[-2:] != df.channel_assignment_date.str[-2:]][
[
"date",
"channel_assignment_date",
"month",
"year",
"exporter.label",
"country_of_destination.name",
"vol",
]
]
m = f"There are {df_mismatch.shape[0]} rows with different years in column 'date' and 'channel_assignment_date': \n"
m += tabulate(df_mismatch, headers="keys", tablefmt="psql")
assert (df.date.str[-2:] == df.channel_assignment_date.str[-2:]).all(), m
# assert df.date.str.replace("-20", "-").equals(df.channel_assignment_date)
# assert (df.month.str[:3] == df.date.str[3:6]).all()
# print out rows with different years in channel_assignment_date and input year
if (df["channel_assignment_date"].str[-2:] != str(year)[-2:]).any():
df_mismatch = df[df["channel_assignment_date"].str[-2:] != str(year)[-2:]][
[
"date",
"channel_assignment_date",
"month",
"year",
"exporter.label",
"country_of_destination.name",
"vol",
]
]
m = f"There are {df_mismatch.shape[0]} rows with years in 'channel_assignment_date' different than {year}: \n"
m += tabulate(df_mismatch, headers="keys", tablefmt="psql")
assert (df["channel_assignment_date"].str[-2:] == str(year)[-2:]).all(), m
def clean_numerical_columns(df):
"""
Clean numerical columns and convert them to float
"""
numerical_columns = [
"vol",
"total_fob_usd_first",
"total_fob_usd_last",
"item_fob_usd_first",
"item_fob_usd_last",
]
for c in numerical_columns:
df[c] = df[c].str.replace(",", ".")
df.loc[df[c] == "", c] = "0" # replace missing value to 0
df[c] = df[c].astype(float)
return df
def clean_names(df):
"""
Clean country and trader names
"""
name_columns = [
"country_of_destination.label",
"importer.label",
"exporter.label",
]
for c in name_columns:
df[c] = df[c].apply(clean_string).str.upper()
df["country_of_destination.name"] = (
df["country_of_destination.label"].apply(fix_encoding).apply(clean_country_name)
)
df["importer.label"] = df["importer.label"].apply(clean_trader_name)
df["exporter.label"] = df["exporter.label"].apply(clean_trader_name)
return df
def load_port():
df_port = get_pandas_df_once(
"bolivia/soy/assets/bolivia_assets.csv",
sep=";",
encoding="utf8",
dtype=str,
keep_default_na=False,
)
df_port = df_port[df_port["ASSET_TYPE"] == "port"][
["UNIQUE_ID", "LOGISTICS_HUB_NAME"]
]
df_port["UNIQUE_ID"] = df_port["UNIQUE_ID"].str[-3:]
port_map = dict(zip(df_port.UNIQUE_ID, df_port.LOGISTICS_HUB_NAME))
return port_map
def clean_port(df):
"""
Create port of export column and clean port columns (port of export + port of import)
"""
# Create port of export column
port_map = load_port()
df["port_code"] = df["declaration_no"].str[8:11]
df["port_of_export.label"] = df["port_code"].map(port_map)
# Clean string
df["port_of_export.label"] = (
df["port_of_export.label"].apply(clean_string).str.upper()
)
df["port_of_import.label"] = (
df["port_of_import.label"].apply(clean_string).str.upper()
)
return df
def fix_encoding(name):
"""
Fix some encoding problem in country labels.
"""
name = name.replace('"', "")
if name == "TURQUAA":
return "TURKEY"
elif name == "PERAS":
return "PERU"
elif name == "USMIA-MIAMIA -FLORIDA":
return "USMIA-MIAMI-FLORIDA"
return name
if __name__ == "__main__":
main()