Preprocess bolivia soy exports 2018 2021.py

View or edit on GitHub

This page is synchronized from trase/models/bolivia/soy/archive/preprocess_bolivia_soy_exports_2018_2021.py.ipynb. Last modified on 2025-12-13 00:30 CET by Trase Admin. Please view or edit the original file there; changes should be reflected here after a midnight build (CET time), or manually triggering it with a GitHub action (link).

import re
import pandas as pd
from trase.tools import *
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sps import clean_string


UNIT_DICT = {
    "TY": "TY-TANK OR CYLINDRICAL TANK",
    "TB": "TB-FOOD CONTAINER",
    "VL": "VL-BULK LIQUID",
    "CT": "CT-CARDBOARD BOX",
    "VY": "VY-SOLID / FINE PARTICLES POWDERS",
    "BG": "BG-BAG, SACK OR PACKAGING",
    "SA": "SA-SACO",
    "VR": "VR-SOLID PARTICLES GRAINS",
}

YEAR = [2021]


def main():
    for year in YEAR:
        df = load_data(year)
        df = column_rename(df, year)
        df = preprocess(df, year)
        df = drop_columns(df, year)
        save_results(df, year)


def load_data(year):
    """
    Load csv file(s) from S3, return pandas dataframe
    """
    path = "bolivia/trade/cd/export/soy/originals/"
    file_name = {
        # 2018: [
        # "BOLIVIA_EXPORT_1507_JAN18_DEC18.csv",
        #  "BOLIVIA_EXPORT_2304_JAN18_DEC18.csv",
        # ],
        # 2019: ["BOLIVA_EXPORT_1507_2304_JAN19_DEC19.csv"],
        # 2020: [
        #   "BOLIVA_EXPORT_12019000000_12081000000_15071000000_15079090000_23040000000_YEAR2020.csv"
        # ],
        2021: ["BOLIVIA_EXPORT_120190_120810_150710_150790_230250_230400_YEAR2021.csv"],
    }

    df_list = []
    for fn in file_name[year]:
        df = get_pandas_df_once(
            f"{path}{fn}",
            encoding="utf8",
            dtype=str,
            keep_default_na=False,
        )
        df_list.append(df)
    df = pd.concat(df_list, ignore_index=True) if len(df_list) > 1 else df_list[0]

    return df


def column_rename(df, year):
    """
    Rename columns based on year
    """
    if year == 2018:
        columns = {
            "Exporter_Code": "exporter.id",
            "Exporter Name": "exporter.label",
            "Importer": "importer.label",
            "weight ": "vol",
            "Destination Country_EN": "country_of_destination.label",
            "puerto": "port_of_import.label",
            "country_proced": "country_proceed",
            "itm_fob_us_first": "item_fob_usd_first",
            "itm_fob_us_last": "item_fob_usd_last",
            "total_fob_us_first": "total_fob_usd_first",
            "total_fob_us_last": "total_fob_usd_last",
            "taxes_first_tot_bs": "taxes_first_total_bs",
            "modalidad": "modality",
            "regimen ": "regime",
            "canal ": "channel",
            "Declarant_Name": "declarant",
            "no_register": "declaration_no",
            "commercial_description": "product_description",
            "commercial_description_EN": "product_description_en",
            "HS Code": "hs",
            "enm_orig": "amend_orig",
        }
    else:
        country_column = (
            "DESTINATION_COUNTRY" if year == 2021 else "DESTINATION _COUNTRY"
        )
        columns = {
            "EXPORTER_CODE": "exporter.id",
            "EXPORTER": "exporter.label",
            "WEIGHT": "vol",
            country_column: "country_of_destination.label",
            "PORT_OF_DISCHARGE": "port_of_import.label",
            "BUYER": "importer.label",
            "BUYER_ADDRESS": "importer.address",
            "HS_CODE": "hs",
            "ENM_ORIG": "amend_orig",
        }

    df = df.rename(columns=columns, errors="raise")

    df = df.rename(
        columns=lambda x: x.strip()
        .replace(" ", "_")
        .lower()
        .replace("_itm_", "_item_")
        .replace("_bob", "_bs"),
        errors="raise",
    )

    return df


def preprocess(df, year):
    """
    Clean date, numerical and categorical columns
    """
    df = clean_numerical_columns(df)
    df = clean_names(df)
    df = clean_port(df)
    df = clean_time(df, year)
    df["hs6"] = df["hs"].str[0:6]
    df["unit_en"] = df.unit.str[0:2].map(UNIT_DICT).fillna("UNKNOWN")
    df.loc[df["amend_orig"] == "ENMIENDA", "amend_orig"] = "AMENDMENT"

    return df


def drop_columns(df, year):
    """
    Drop columns that are not needed
    """
    if year == 2018:
        # Check null
        check_columns = ["custom_port", "item_value", "patron"]
        for c in check_columns:
            assert (df[c] == "").all()
        columns = [
            "country_of_destination.label",
            "product_description_en",
            "custom_port",
            "item_value",
            "patron",
        ]
    else:
        columns = [
            "date",
            "heading",
            "chapter",
            "sub_heading",
        ]

    unnecessary_columns = [
        "user",
        "taxes_first_total_bs",
        "taxes_last_total_bs",
        "taxes_last_item_bs",
        "taxes_first_item_bs",
    ]
    df.drop(
        columns + unnecessary_columns,
        axis=1,
        inplace=True,
    )

    return df


def save_results(df, year):
    """
    Save dataframe to S3
    """
    df.to_csv(
        df,
        f"bolivia_soy_export_{year}.csv",
        sep=";",
        encoding="utf8",
    )


def clean_country_name(country_name):
    """
    Clean country name based on database
    """
    country_id = check_country(country_name)
    return get_node_name(country_id)


def clean_trader_name(trader_name):
    """
    Clean trader names, mostly about country names and suffix, like changing S.A. to SA
    """
    trader_name = re.sub("[,./]", " ", trader_name)
    trader_name = " ".join(trader_name.split())
    synonyms = {
        " SA": [" S A"],
        " SPA": [" SP A", " S PA", " S P A"],
        " SRL": [" S RL", " SR L", " S R L"],
        " SAA": [" SA A"],
        " SAC": [" SA C"],
        " EIRL": [" E I R L"],
        " & ": [" Y "],
        " BOLIVIA ": [" BOLVIA "],
        " AMERICAS ": [" AMEERICAS "],
        " AGRITRADE ": [" AGTRITRADE ", " AGRITADE "],
        "CARGILL AMERICAS INC": ["CARGILL AMERICA INC"],
    }

    for name, labels in synonyms.items():
        for label in labels:
            trader_name = trader_name.replace(label, name)
    return trader_name


def clean_time(df, year):
    """
    Check and Clean time columns
    """
    time_columns = ["channel_assignment_date", "month", "year"]

    df["channel_assignment_date"] = df["channel_assignment_date"].str.zfill(9)
    if "date" in df.columns:
        time_columns.append("date")
        df["date"] = df["date"].str.replace("/", "-").str.zfill(11)

    assure_time(df, year)
    df["year"] = str(year)
    df["month"] = df.channel_assignment_date.str[3:-3]

    for t in time_columns:
        df[t] = df[t].str.upper()

    return df


def assure_time(df, year):
    """
    Check whether years match in time columns
    """
    if "date" in df.columns:
        # print out rows with different years in column date and channel_assignment_date
        if (df.date.str[-2:] != df.channel_assignment_date.str[-2:]).any():
            df_mismatch = df[df.date.str[-2:] != df.channel_assignment_date.str[-2:]][
                [
                    "date",
                    "channel_assignment_date",
                    "month",
                    "year",
                    "exporter.label",
                    "country_of_destination.name",
                    "vol",
                ]
            ]
            m = f"There are {df_mismatch.shape[0]} rows with different years in column 'date' and 'channel_assignment_date': \n"
            m += tabulate(df_mismatch, headers="keys", tablefmt="psql")
        assert (df.date.str[-2:] == df.channel_assignment_date.str[-2:]).all(), m
        # assert df.date.str.replace("-20", "-").equals(df.channel_assignment_date)
        # assert (df.month.str[:3] == df.date.str[3:6]).all()

    # print out rows with different years in channel_assignment_date and input year
    if (df["channel_assignment_date"].str[-2:] != str(year)[-2:]).any():
        df_mismatch = df[df["channel_assignment_date"].str[-2:] != str(year)[-2:]][
            [
                "date",
                "channel_assignment_date",
                "month",
                "year",
                "exporter.label",
                "country_of_destination.name",
                "vol",
            ]
        ]
        m = f"There are {df_mismatch.shape[0]} rows with years in 'channel_assignment_date' different than {year}: \n"
        m += tabulate(df_mismatch, headers="keys", tablefmt="psql")
    assert (df["channel_assignment_date"].str[-2:] == str(year)[-2:]).all(), m


def clean_numerical_columns(df):
    """
    Clean numerical columns and convert them to float
    """
    numerical_columns = [
        "vol",
        "total_fob_usd_first",
        "total_fob_usd_last",
        "item_fob_usd_first",
        "item_fob_usd_last",
    ]
    for c in numerical_columns:
        df[c] = df[c].str.replace(",", ".")
        df.loc[df[c] == "", c] = "0"  # replace missing value to 0
        df[c] = df[c].astype(float)
    return df


def clean_names(df):
    """
    Clean country and trader names
    """
    name_columns = [
        "country_of_destination.label",
        "importer.label",
        "exporter.label",
    ]
    for c in name_columns:
        df[c] = df[c].apply(clean_string).str.upper()

    df["country_of_destination.name"] = (
        df["country_of_destination.label"].apply(fix_encoding).apply(clean_country_name)
    )
    df["importer.label"] = df["importer.label"].apply(clean_trader_name)
    df["exporter.label"] = df["exporter.label"].apply(clean_trader_name)
    return df


def load_port():
    df_port = get_pandas_df_once(
        "bolivia/soy/assets/bolivia_assets.csv",
        sep=";",
        encoding="utf8",
        dtype=str,
        keep_default_na=False,
    )
    df_port = df_port[df_port["ASSET_TYPE"] == "port"][
        ["UNIQUE_ID", "LOGISTICS_HUB_NAME"]
    ]
    df_port["UNIQUE_ID"] = df_port["UNIQUE_ID"].str[-3:]
    port_map = dict(zip(df_port.UNIQUE_ID, df_port.LOGISTICS_HUB_NAME))
    return port_map


def clean_port(df):
    """
    Create port of export column and clean port columns (port of export + port of import)
    """
    # Create port of export column
    port_map = load_port()
    df["port_code"] = df["declaration_no"].str[8:11]
    df["port_of_export.label"] = df["port_code"].map(port_map)

    # Clean string
    df["port_of_export.label"] = (
        df["port_of_export.label"].apply(clean_string).str.upper()
    )
    df["port_of_import.label"] = (
        df["port_of_import.label"].apply(clean_string).str.upper()
    )
    return df


def fix_encoding(name):
    """
    Fix some encoding problem in country labels.
    """
    name = name.replace('"', "")
    if name == "TURQUAA":
        return "TURKEY"
    elif name == "PERAS":
        return "PERU"
    elif name == "USMIA-MIAMIA -FLORIDA":
        return "USMIA-MIAMI-FLORIDA"
    return name


if __name__ == "__main__":
    main()