Skip to content

DBT: Brazil Bol 2021

File location: s3://trase-storage/brazil/trade/bol/2021/BRAZIL_BOL_2021.csv

DBT model name: brazil_bol_2021

Explore on Metabase: Full table; summary statistics

DBT details


Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/2021/BRAZIL_BOL_2021.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description
month VARCHAR
hs6 VARCHAR
hs6_description VARCHAR
hs8 VARCHAR
BL_Description/BL Description VARCHAR
exporter.label VARCHAR
exporter.cnpj VARCHAR
Company_Shipper/Street VARCHAR
exporter.municipality.label VARCHAR
exporter.state.label VARCHAR
exporter.country.label VARCHAR
importer.label VARCHAR
vessel.label VARCHAR
vessel.id VARCHAR
Vessel/Voyage VARCHAR
Place_and_Ports/POR_Name VARCHAR
port_of_export.label VARCHAR
port_of_import.label VARCHAR
Place_and_Ports/POD_Country VARCHAR
Place_and_Ports/POMD_Name VARCHAR
Place_and_Ports/POMD_Country VARCHAR
Place_and_Ports/DEST_Name VARCHAR
country_of_destination.label VARCHAR
vol VARCHAR
WTMT VARCHAR
fob VARCHAR
year VARCHAR
hs4 VARCHAR
hs5 VARCHAR
exporter.type VARCHAR
exporter.state.name VARCHAR
exporter.state.trase_id VARCHAR
port_of_export.name VARCHAR
exporter.municipality.name VARCHAR
exporter.municipality.trase_id VARCHAR
country_of_destination.name VARCHAR
country_of_destination.trase_id VARCHAR
country_of_destination.economic_bloc VARCHAR
importer.trader_id VARCHAR
importer.name VARCHAR
importer.group VARCHAR
exporter.trase_id VARCHAR
exporter.trader_id VARCHAR
exporter.group VARCHAR
exporter.name VARCHAR

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.dataliner_report_stockholm_exp_br_2020_to_2021_version_1
  • model.trase_duckdb.hs2017

Sources

  • ['trase-storage-raw', 'dataliner_report_stockholm_exp_br_2020_to_2021_version_1']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql

from trase.tools import (
    find_label,
    get_country_id,
    get_label_trader_id,
    get_node_name,
    get_trader_group_id,
    uses_database,
)
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pandasdb.find import (
    find_default_name_by_node_id,
    find_economic_blocs_by_trase_id,
    find_traders_and_groups_by_label,
    find_traders_and_groups_by_trase_id,
)
from trase.tools.utilities.helpers import clean_string

YEAR = 2021
MISSING_VALUES = ["NAN", "NONE", "NA", ""]


def load_and_rename_data():
    df = get_pandas_df_once(
        "brazil/trade/bol/2021/originals/Dataliner_Report_STOCKHOLM_EXP_BR_2020_TO_2021_Version_1.xlsx",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
        xlsx=True,
    )

    # set the first row as header
    raise NotImplementedError(
        "File header adjusted since this code was run - consider updating"
    )
    df.columns = df.iloc[0]
    df = df.drop(df.index[0])

    # rename the columns
    columns = {
        "Period/YYYYMM": "month",
        "Commodity_HS_Datamar/HS6 Code": "hs6",
        "Commodity_HS_Datamar/HS6 English": "hs6_description",
        "Commodity_HS_Datamar/HS8 Code": "hs8",
        # exporter
        "Company_Shipper/City": "exporter.municipality.label",  # seems to be municipality...
        "Company_Shipper/Registration Number": "exporter.cnpj",
        "Company_Shipper/Shipper Name": "exporter.label",
        "Company_Shipper/State Name": "exporter.state.label",
        "Company_Shipper/Country Name": "exporter.country.label",
        # vessel
        "Vessel/Vessel Name": "vessel.label",
        "Vessel/IMO": "vessel.id",
        # ports, country
        "Place_and_Ports/POL_Name": "port_of_export.label",
        "Place_and_Ports/POD_Name": "port_of_import.label",
        "Place_and_Ports/DEST_Country": "country_of_destination.label",
        # importer
        "Company_Consignee/Consignee Name": "importer.label",
        # volume, fob
        "WTKG": "vol",
        "FOB VALUE USD": "fob",
    }
    df = df.rename(columns=columns, errors="raise")

    return df


def clean_time(df):
    """Parse time and do some basic checks"""
    assert (
        sum(df["month"].str.len() != 6) == 0
    ), "Column 'Period/YYYYMM' should only contain six digits."

    df["year"] = df["month"].str[:4]
    df["month"] = df["month"].str[-2:]

    assert sum(df["year"] != str(YEAR)) == 0, f"Year has to be {YEAR}."
    assert (
        df[(df["month"].astype(int) > 12) & (df["month"].astype(int) < 1)].shape[0] == 0
    ), f"Year has to be {YEAR}."
    return df


def clean_hs(df):
    """Do some basic checks of hs codes, and create hs4 and hs5 based on hs6."""
    # check the basic format of the hs code columns
    assert (
        sum(df["hs6"].str.len() != 6) == 0
    ), "Column 'Commodity_HS_Datamar/HS6 Code' should contain 6 digits."
    assert (
        sum(df["hs8"].str.len() != 8) == 0
    ), "Column 'Commodity_HS_Datamar/HS8 Code' should contain 6 digits."

    assert (
        df[~df["hs6"].str.isdigit()].shape[0] == 0
    ), "Column 'Commodity_HS_Datamar/HS6 Code' should only contain digits."

    assert (
        df[~df["hs8"].str.isdigit()].shape[0] == 0
    ), "Column 'Commodity_HS_Datamar/HS8 Code' should only contain digits."

    assert (
        sum(df["hs6"] != df["hs8"].str[:6]) == 0
    ), "Column 'Commodity_HS_Datamar/HS6 Code' does not match 'Commodity_HS_Datamar/HS8 Code'."

    # change the hs6 120100 to 120190 (both refer to general 'raw' soy, but 120190 is the one comtrade uses)
    df.loc[df["hs6"] == "120100", "hs6"] = "120190"

    df["hs4"] = df["hs6"].str.slice(0, 4)
    df["hs5"] = df["hs6"].str.slice(0, 5)

    # check whether the hs4 codes already exist in our dict
    df_hscodes = get_pandas_df_once(
        "world/metadata/codes/hs/HS2017.csv", sep=";", dtype=str, keep_default_na=False
    )
    hs4_list = df_hscodes[df_hscodes["type"] == "hs4"]["code"].to_list()

    df = df[
        df["hs4"].isin(hs4_list)
    ]  # filter out the rows without valid hs4 in our dict, TODO: check whether it is the correct way to do this? do we need to check all the hs4/hs6 codes not included in the 2017 file

    return df


def clean_string_columns(df, column_list):
    # clean the string columns
    for column in column_list:
        df[column] = df[column].apply(clean_string)

    # replace null values to UNKNOWN
    for column in df.columns:
        df.loc[df[column].isin(MISSING_VALUES), column] = "UNKNOWN"

    return df


def clean_cnpjs(df):
    """Clean cnpjs and create a column 'exporter.type' indicating cnpj or cpf."""
    assert (
        df[~df["exporter.cnpj"].str.isdigit()].shape[0] == 0
    ), "Column 'Company_Shipper/Registration Number' should only contain digits."

    cnpj = df["exporter.cnpj"].str.rjust(14, "0")
    cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)

    cpf = df["exporter.cnpj"].str.rjust(11, "0")
    cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)
    # cnpj_valid[cpf.isin(KNOWN_CPFS)] = False

    assert not any(cnpj_valid & cpf_valid)

    df["exporter.type"] = "unknown"
    df.loc[cnpj_valid, "exporter.type"] = "cnpj"
    df.loc[cpf_valid, "exporter.type"] = "cpf"

    df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
    df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])
    df.loc[df["exporter.cnpj"] == "0", "exporter.cnpj"] = "0" * 14

    return df


@uses_database
def get_country_labels(cnx=None):
    """Retrieve country name, label, trase_id, and economic bloc"""

    # get name, label, and trase_id
    df = pd.read_sql(
        """
        select distinct 
            name as "country_of_destination.name",
            unnest(synonyms) as "country_of_destination.label",
            coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
        from views.regions where level = 1 and length(trase_id) = 2
        """,
        cnx.cnx,
    )

    # add economic bloc
    df[["country_of_destination.economic_bloc"]] = find_economic_blocs_by_trase_id(
        df.rename(columns={"country_of_destination.trase_id": "trase_id"})[
            ["trase_id"]
        ],
        returning=["economic_bloc_name"],
    )
    assert not any(df["country_of_destination.economic_bloc"].isna())
    assert all(df["country_of_destination.economic_bloc"].str.len() > 3)

    return df


def assert_none_missing(df, column):
    missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
    assert missing.empty, f"Not all {column} found:\n{missing}"


def clean_countries(df):
    """Introduce country name and trase id to the dataframe"""
    df = pd.merge(
        df,
        get_country_labels(),
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )

    assert_none_missing(df, "country_of_destination.name")
    return df


@uses_database
def get_state_labels(cnx=None):
    """Retrieve state name, label, and trase_id"""
    df = pd.read_sql(
        """
        select distinct 
            name as "exporter.state.name",
            unnest(synonyms) as "exporter.state.label",
            coalesce(trase_id, 'BR-XX') AS "exporter.state.trase_id"
        from views.regions where level = 3 and length(trase_id) = 5 and country = 'BRAZIL'
        """,
        cnx.cnx,
    )
    return df


def clean_states(df):
    """Introduce state name and trase id to the dataframe"""

    # correct wrong states for certain municipalities
    municipality_state_dict = {
        "BUENOS AIRES": "PERNAMBUCO",
        "ALFENAS": "MINAS GERAIS",
    }
    for municipality, state in municipality_state_dict.items():
        df.loc[
            df["exporter.municipality.label"] == municipality, "exporter.state.label"
        ] = state

    df = pd.merge(
        df,
        get_state_labels(),
        on="exporter.state.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )

    assert_none_missing(df, "exporter.state.name")
    return df


@uses_database
def get_port_labels(cnx=None):
    df = pd.read_sql(
        """
        select distinct 
            name as "port_of_export.name", 
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )
    df_new_synonyms = pd.DataFrame(
        [
            ["TROMBETAS", "TROMBETAS"],
            ["ALUMAR", "ALUMAR"],
            ["PONTA UBU", "PONTA UBU"],
            ["UNKNOWN", "PLACE_AND_PORTS/POL_NAME"],
            ["JURUTI", "JURUTI"],
            ["FLUMINENSE TERMINAL PORT", "FLUMINENSE TERMINAL PORT"],
            ["FLUMINENSE TERMINAL PORT", "BIJUPIRA SALEMA FIELD"],
        ],
        columns=["port_of_export.name", "port_of_export.label"],
    )

    df_combined = pd.concat([df, df_new_synonyms], ignore_index=True)
    # Filter `df_new_synonyms` to include only those rows that do not exist in `df`
    df_combined = df_combined.drop_duplicates(
        subset=["port_of_export.name", "port_of_export.label"], keep="first"
    )
    return df


def clean_ports(df):
    """Introduce port name and trase id to the dataframe"""
    df = pd.merge(
        df,
        get_port_labels(),
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )

    assert_none_missing(df, "port_of_export.name")
    return df


@uses_database
def get_municipality_labels(cnx=None):
    df_municipalities = pd.read_sql(
        f"""
        select distinct
            name as "exporter.municipality.name",
            unnest(synonyms) as "exporter.municipality.label",
            trase_id as "exporter.municipality.trase_id",
            substr(trase_id, 0, 6) as "exporter.state.trase_id"
        from views.regions
        where country = 'BRAZIL' and region_type = 'MUNICIPALITY'
        """,
        cnx.cnx,
    )
    return df_municipalities


def clean_municipalities(df):
    """Introduce municipality name and trase id to the dataframe"""

    # correct some exporter countries
    municipality_external = {"JOUNIEH": "LEBANON", "ROTTERDAM": "NETHERLANDS"}
    for municipality, country in municipality_external.items():
        df.loc[
            df["exporter.municipality.label"] == municipality, "exporter.country.label"
        ] = country

    # correct some synonyms of municipalities
    municipality_synonyms = {
        "BALMONTE": "BELMONTE",
        "CAPAO GRANDE": "VARZEA GRANDE",  # CAPAO GRANDE belongs to VARZEA GRANDE municipality
        "IDROLANDIA": "SIDROLANDIA",
        "ALTA FLORESTA D OESTE": "ALTA FLORESTA D'OESTE",
    }
    for synonym, municipality in municipality_synonyms.items():
        df.loc[
            df["exporter.municipality.label"] == synonym, "exporter.municipality.label"
        ] = municipality

    # one special case where the municipality and state are both Mato Grosso
    condition1 = df["Company_Shipper/Street"] == "AV MARECHAL RONDON JARDIM PARAISO"
    condition2 = df["exporter.cnpj"] == "0" * 14
    condition3 = df["exporter.label"] == "JBS SA"
    condition4 = df["exporter.municipality.label"] == "MATO GROSSO"
    condition5 = df["exporter.state.label"] == "MATO GROSSO"

    df.loc[
        condition1 & condition2 & condition3 & condition4 & condition5,
        "exporter.municipality.label",
    ] = "CACERES"

    # split df based on the exporter countries
    df_brazil = df[df["exporter.country.label"] == "BRAZIL"]
    df_no_brazil = df[df["exporter.country.label"] != "BRAZIL"]

    # merge with municipality information in DB
    df_brazil = pd.merge(
        df_brazil,
        get_municipality_labels(),
        on=["exporter.municipality.label", "exporter.state.trase_id"],
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df_brazil, "exporter.municipality.name")

    # replace municipality names and trase ids of foreign exporters to unknown
    df_no_brazil["exporter.municipality.name"] = "UNKNOWN MUNICIPALITY"
    df_no_brazil["exporter.municipality.trase_id"] = "BR-XXXXXXX"

    df = pd.concat([df_brazil, df_no_brazil])

    return df


def check_numerical(df, columns):
    for c in columns:
        num_array = df[c].str.lstrip(".").copy()
        assert (
            num_array.apply(pd.to_numeric, errors="coerce").notnull().all()
        ), f"Column {c} contains non-numerical value."


@uses_database
def clean_importers(df, cur=None, cnx=None):
    df_importers = df[["importer.label"]].drop_duplicates()

    # clean importer names
    df_importers[["importer.trader_id", "importer.name", "importer.group", "count"]] = (
        find_traders_and_groups_by_label(
            df_importers.rename(columns={"importer.label": "trader_label"}),
            returning=["trader_id", "trader_name", "group_name", "count"],
            year=sql.Literal(YEAR),
            cur=cur,
            cnx=cnx,
        )
    )

    # special case for UNKNOWN CUSTOMER (there are two!)
    is_unknown = (df_importers["count"] != 1) & (
        df_importers["importer.label"] == "UNKNOWN CUSTOMER"
    )
    if any(is_unknown):
        brazil_id = get_country_id("BRAZIL", cur=cur)
        label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
        trader_id = get_label_trader_id(label_id, brazil_id)
        trader_name = get_node_name(trader_id, cur=cur)
        group_id = get_trader_group_id(trader_id, cur=cur)
        group_name = get_node_name(group_id, cur=cur)
        df_importers.loc[is_unknown, "importer.trader_id"] = trader_id
        df_importers.loc[is_unknown, "importer.name"] = trader_name
        df_importers.loc[is_unknown, "importer.group"] = group_name
        df_importers.loc[is_unknown, "count"] = 1

    # we should have found one unique node for every importer
    bad = df_importers.pop("count") != 1
    if any(bad):
        raise ValueError(f"Missing some importers:\n{df_importers[bad]}")

    # merge back into result
    df = pd.merge(
        df,
        df_importers,
        on=["importer.label"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df.pop("_merge")
    assert all(merge == "both")
    return df


@uses_database
def clean_exporters_and_add_group(df, cur=None, cnx=None):
    """
    This function adds two columns:

        exporter.name - the default name of the exporter from the database
        exporter.group - the group name from the database

    It does this using the following algorithm:

     1. Construct a Trase ID from exporter.cnpj and use this to perform a lookup in the
        database
     2. If a unique name + group cannot be found through that method, use exporter.label
        to perform a lookup among trader labels in the database

    TODO: try to do this more concisely / in fewer lines of code
    """
    trase_ids = "BR-TRADER-" + df["exporter.cnpj"].str.slice(0, 8)
    trase_ids = trase_ids.replace({"BR-TRADER-00000000": None})
    df = df.assign(**{"exporter.trase_id": trase_ids})
    df_exporters = df[["exporter.label", "exporter.trase_id"]].drop_duplicates()

    # clean exporter names using trase id
    df_exporters[["exporter.trader_id", "exporter.group", "count"]] = (
        find_traders_and_groups_by_trase_id(
            df_exporters.rename(columns={"exporter.trase_id": "trase_id"})[
                ["trase_id"]
            ],
            returning=["trader_id", "group_name", "count"],
            year=sql.Literal(YEAR),
            cur=cur,
            cnx=cnx,
        )
    )
    counts = df_exporters.pop("count")
    assert all(counts.isin([0, 1]))
    not_found_by_trase_id = counts == 0
    print(
        f"{sum(~not_found_by_trase_id)} exporters were found by Trase ID and "
        f"{sum(not_found_by_trase_id)} were not"
    )
    df_found_by_trase_id = df_exporters[~not_found_by_trase_id]
    df_missing = df_exporters[not_found_by_trase_id].copy()

    # if not found by Trase ID, then look by name
    labels = df_missing["exporter.label"].drop_duplicates()
    df_labels = pd.DataFrame(labels)
    df_labels[["exporter.trader_id", "exporter.group", "count"]] = (
        find_traders_and_groups_by_label(
            df_labels.rename(columns={"exporter.label": "trader_label"}),
            returning=["trader_id", "group_name", "count"],
            year=sql.Literal(YEAR),
        )
    )

    # special case for UNKNOWN CUSTOMER
    is_unknown = (df_labels["count"] != 1) & (
        df_labels["exporter.label"] == "UNKNOWN CUSTOMER"
    )
    if any(is_unknown):
        brazil_id = get_country_id("BRAZIL", cur=cur)
        label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
        trader_id = get_label_trader_id(label_id, brazil_id)
        group_id = get_trader_group_id(trader_id, cur=cur)
        group_name = get_node_name(group_id, cur=cur)
        df_labels.loc[is_unknown, "exporter.trader_id"] = trader_id
        df_labels.loc[is_unknown, "exporter.group"] = group_name
        df_labels.loc[is_unknown, "count"] = 1

    # we should have found one unique node for every importer
    bad = df_labels.pop("count") != 1
    if any(bad):
        raise ValueError(f"Missing some exporters:\n{df_labels[bad]}")

    # merge exporters found by trase id back into results
    right = df_found_by_trase_id[
        ["exporter.trase_id", "exporter.trader_id", "exporter.group"]
    ].drop_duplicates()
    df1 = pd.merge(
        df,
        right,
        on=["exporter.trase_id"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df1.pop("_merge")
    df_solved1 = df1[merge == "both"]

    # merge exporters found by label back into results
    df_unsolved = df1[merge != "both"]
    df_unsolved = df_unsolved.drop(
        columns=["exporter.trader_id", "exporter.group"], errors="raise"
    )

    right = df_labels[
        ["exporter.label", "exporter.trader_id", "exporter.group"]
    ].drop_duplicates()
    df_solved2 = pd.merge(
        df_unsolved,
        right,
        on=["exporter.label"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df_solved2.pop("_merge")
    assert all(merge == "both")

    # combine the two
    expected_columns = list(set(df.columns) | {"exporter.trader_id", "exporter.group"})
    assert sorted(df_solved2.columns) == sorted(expected_columns)
    assert sorted(df_solved1.columns) == sorted(expected_columns)
    df_final = pd.concat([df_solved1, df_solved2]).reset_index(drop=True)

    # guarantee that we didn't change the original data
    a = df.sort_values(list(df.columns)).reset_index(drop=True)
    b = df_final[df.columns].sort_values(list(df.columns)).reset_index(drop=True)
    b.columns.name = a.columns.name  # needed for assert equal but don't know what it is
    pd.testing.assert_frame_equal(a, b)

    # add exporter names
    df_final = df_final.astype({"exporter.trader_id": int})
    df_final[["exporter.name"]] = find_default_name_by_node_id(
        df_final[["exporter.trader_id"]].rename(
            columns={"exporter.trader_id": "node_id"}
        ),
        returning=["name"],
        cnx=cnx,
        cur=cur,
    )

    return df_final


def clean_fob_outliers(df):
    """
    This function cleans outliers in the 'fob' column of the DataFrame,
    based on the cost_per_kg of each record. It uses the modified z_score
    related to the cost_per_kg within each hs6 group.
    For records that have a z_score greater than 5, the 'fob' value is replaced
    based in the average cost_per_kg of the corresponding hs6 group.
    Also, records that have 'fob' = 0 , get its value imputed based on its 'vol'.
    This method adjusts the values for 6119 records.
    """

    def modified_z_score(series):
        """
        Calculate the modified z score of a series of values. The modified z-score
        is useful in identifying outliers in a dataset. It relies in the median
        absolute deviation (MAD) instead of the standard deviation, making it less
        sensitive to outliers.
        """
        median = series.median()
        mad = (series - median).abs().median()
        return 0.6745 * (series - median) / mad

    df["fob"] = df["fob"].astype(float)
    df["vol"] = df["vol"].astype(float)

    # Do calculations with rows where 'fob' > 0 and 'vol' > 0 (there are 1876 records with fob=0)
    df_filtered = df[(df["fob"] > 0) & (df["vol"] > 0)].copy()

    # Create a 'cost_per_kg' column
    df_filtered.loc[:, "cost_per_kg"] = df_filtered["fob"] / df_filtered["vol"]

    # Group the DataFrame by 'hs6'
    grouped = df_filtered.groupby("hs6")

    # # Save summary statistics of the z score if wanting to inspect them
    # # Dictionary to store summary statistics
    # summary_stats = {}

    # Loop through each hs group, calcuate the z score within them,
    # and for outliers with z_score > 5, replace the 'fob' value based on the average 'cost_per_kg'
    # hs groups with less than 40 records are not considered (if considered they would probably
    # need other method - for example sampling with replacement to smooth the distribution a bit)
    for hs6, group in grouped:
        if len(group) > 40:
            # stats = group['cost_per_kg'].describe()
            # summary_stats[hs6] = stats

            # Calculate the modified z-score
            group = group.copy()
            group["fob_z_score"] = modified_z_score(group["cost_per_kg"])
            df_filtered.loc[group.index, "fob_z_score"] = group["fob_z_score"]

            # Replace 'fob' values for records with fob_z_score > 5
            non_outliers = group[group["fob_z_score"].abs() <= 5]
            avg_cost_per_kg = non_outliers["cost_per_kg"].mean()
            outlier_indices = group[group["fob_z_score"].abs() > 5].index
            df_filtered.loc[outlier_indices, "fob"] = (
                avg_cost_per_kg * df_filtered.loc[outlier_indices, "vol"]
            )

    # Update the fob values in the original DataFrame
    df.update(df_filtered[["fob"]])

    # Merge the fob_z_score column into the original DataFrame where applicable
    df = df.merge(
        df_filtered[["fob_z_score"]], how="left", left_index=True, right_index=True
    )

    # Replace 'fob' values for records where 'fob' is 0 based on the average 'cost_per_kg' of the corresponding 'hs6'
    for hs6, group in df.groupby("hs6"):
        avg_cost_per_kg = df_filtered[df_filtered["hs6"] == hs6]["cost_per_kg"].mean()
        zero_fob_indices = group[(group["fob"] == 0) & (group["vol"] > 0)].index
        df.loc[zero_fob_indices, "fob"] = (
            avg_cost_per_kg * df.loc[zero_fob_indices, "vol"]
        )

    # There are 3 records where 'fob' was 0, but there weren't any records with an fob value so to take an average
    # (for hs6 180600 and 010229). So the above operation converted them to null. We will replace them with 0.
    df["fob"] = df["fob"].fillna(0)

    # Convert summary statistics to DataFrame and sort by 'max' value from the summary statistics descending
    # summary_df = pd.DataFrame(summary_stats).transpose()
    # summary_df = summary_df.sort_values(by='max', ascending=False)
    # print(summary_df)

    df.drop(columns=["fob_z_score"], inplace=True)

    return df


def main():
    df = load_and_rename_data()

    # clean time, hs codes, and cnpjs
    df = clean_time(df)
    df = clean_hs(df)
    df = clean_cnpjs(df)

    # clean string columns
    string_columns = [
        c for c in df.columns.to_list() if c not in ["vol", "WTMT", "fob"]
    ]
    df = clean_string_columns(df, string_columns)
    df = clean_states(df)
    df = clean_ports(df)
    df = clean_municipalities(df)
    df = clean_countries(df)
    df = clean_importers(df)
    df = clean_exporters_and_add_group(df)
    df = clean_fob_outliers(df)

    # check numerical columns
    # num_column_list = ["vol", "fob", "WTMT"]
    # Removing 'vol' and 'fob' as they are float types and will fail the check (which expects strings)
    num_column_list = ["WTMT"]
    check_numerical(df, num_column_list)

    # save to csv
    write_csv_for_upload(df, "brazil/trade/bol/2021/BRAZIL_BOL_2021.csv")


if __name__ == "__main__":
    main()
import pandas as pd

BEEF_HS4 = [
    "0102",  # Bovine animals; live
    "0201",  # Meat of bovine animals; fresh or chilled
    "0202",  # Meat of bovine animals; frozen
    "0206",  # Edible offal of bovine + other animals; fresh, chilled or frozen
    "0210",  # Meat and edible meat offal; salted/brine/etc. (does not exist in BoL)
    "0504",  # Guts, bladders and stomachs of animals (does not exist in BoL)
    "1602",  # Prepared or preserved meat, meat offal or blood
]


def model(dbt, cursor):
    dbt.source(
        "trase-storage-raw",
        "dataliner_report_stockholm_exp_br_2020_to_2021_version_1",
    )
    dbt.ref("hs2017")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})