Skip to content

Cd Disaggregated Beef 2023

s3://trase-storage/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_2023.parquet

Dbt path: trase_production.main_brazil.cd_disaggregated_beef_2023

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/_schema_cd_disaggregated_beef.yml

Model file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/cd_disaggregated_beef_2023.py

Calls script: trase/data/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_201X.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: beef, brazil, cd, disaggregated, trade


cd_disaggregated_beef_2023

Description

No description


Details

Column Type Description
index_mdic VARCHAR
index_bol VARCHAR
vol VARCHAR
vol_mdic VARCHAR
matching_stage VARCHAR
state.trase_id VARCHAR
message VARCHAR
via VARCHAR
hs4 VARCHAR
hs5 VARCHAR
hs6 VARCHAR
hs8 VARCHAR
exporter.cnpj VARCHAR
exporter.label VARCHAR
port_of_export.name VARCHAR
port_of_export.group VARCHAR
exporter.type VARCHAR
exporter.municipality.trase_id VARCHAR
importer.label VARCHAR
country_of_destination.name VARCHAR
country_of_destination.trase_id VARCHAR
country_of_destination.group VARCHAR
matched VARCHAR
fob VARCHAR
year BIGINT
exporter_geocode VARCHAR
state_of_production VARCHAR

Models / Seeds

  • model.trase_duckdb.brazil_bol_2023_gold
  • model.trase_duckdb.brazil_mdic_disaggregated_2023_beef
  • model.trase_duckdb.brazil_mdic_port_2023
  • model.trase_duckdb.postgres_regions_without_geometry
import numpy as np
import pandas as pd

from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.utilities.helpers import clean_string
from trase.tools.matching.fair_allocation import direct_sourcing
from trase.tools.sei_pcs.pandas_utilities import (
    Compare,
    compare_dataframes_single,
    rename,
)

PORT_GROUPS = {
    # for the matching, we consider the following ports to be equivalent
    "BELEM": "BARCARENA",
    "PECEM": "FORTALEZA",  # Pecem is a terminal 60km from Fortaleza
}
COUNTRY_GROUPS = {
    # for the matching, we consider the following countries to be equivalent
    "CHINA (HONG KONG)": "CHINA",
    "CHINA (MAINLAND)": "CHINA",
    "CONGO DEMOCRATIC REPUBLIC OF THE": "CONGO",
}
UNKNOWNS = {  # the first item is considered to be the "canonical" unknown value
    "cnpj": ["0"],
    "country_of_destination.label": ["UNKNOWN COUNTRY"],
    "country_of_destination.name": ["UNKNOWN COUNTRY"],
    "country_of_destination.group": ["UNKNOWN COUNTRY GROUP"],
    "country_of_destination.trase_id": ["XX"],
    "exporter.cnpj": ["0"],
    "exporter.label": ["UNKNOWN COMPANY"],
    "exporter.municipality.trase_id": ["BR-XXXXXXX", ""],
    "exporter.type": ["UNKNOWN"],
    "hs4": ["XXXX"],
    "hs5": ["XXXXX"],
    "hs6": ["XXXXXX"],
    "hs8": ["XXXXXXXX"],
    "importer.label": ["UNKNOWN COMPANY"],
    "month": [-1],
    "port_of_export.name": ["UNKNOWN PORT BRAZIL", "UNKNOWN PORT"],
    "port_of_export.group": ["UNKNOWN PORT GROUP BRAZIL"],
    "state.trase_id": ["BR-XX"],
    "matching_stage": ["UNMATCHED"],
    "via": ["XX"],
    "message": [""],
    "success": ["N/A"],
}
BEEF_HS4 = [
    "0102",  # Bovine animals; live
    "0201",  # Meat of bovine animals; fresh or chilled
    "0202",  # Meat of bovine animals; frozen
    "0206",  # Edible offal of bovine + other animals; fresh, chilled or frozen
    "0210",  # Meat and edible meat offal; salted/brine/etc. (does not exist in BoL)
    "0504",  # Guts, bladders and stomachs of animals (does not exist in BoL)
    "1602",  # Prepared or preserved meat, meat offal or blood
]
COLUMNS_TO_KEEP_FROM_BOL = [
    "via",
    "hs4",
    "hs5",
    "hs6",
    "hs8",
    "exporter.cnpj",
    "exporter.label",
    "port_of_export.name",
    "port_of_export.group",
    "exporter.type",
    "exporter.municipality.trase_id",
    "importer.label",
    "country_of_destination.name",
    "country_of_destination.trase_id",
    "country_of_destination.group",
]
COLUMNS_TO_KEEP_FROM_MDIC = [
    "state.trase_id",
    "success",
    "message",
]


def add_interpolated_fob_values(df, df_final, year):
    # get price (=FOB / volume) from the original MDIC (Port) file
    df = df[["hs4", "hs6", "fob", "vol"]]

    # abnormal vol can result to extremely high price
    df = df[(df["vol"].astype(float) > 1) & (df["fob"].astype(float) > 0)].copy()

    df["price"] = df["fob"].astype(np.int64) / df["vol"].astype(np.int64)
    df["hs5"] = df["hs6"].str.slice(0, 5)

    # get average prices per HS6/5/4
    df_hs6 = df.groupby("hs6")["price"].mean().rename("price_hs6").reset_index()
    df_hs5 = df.groupby("hs5")["price"].mean().rename("price_hs5").reset_index()
    df_hs4 = df.groupby("hs4")["price"].mean().rename("price_hs4").reset_index()

    # merge in to df_final
    df_final = pd.merge(df_final, df_hs6, on="hs6", how="left")
    df_final = pd.merge(df_final, df_hs5, on="hs5", how="left")
    df_final = pd.merge(df_final, df_hs4, on="hs4", how="left")

    # take the price for HS6 if it exists, else for HS5, else for HS4
    price = df_final.pop("price_hs5").combine_first(df_final.pop("price_hs4"))
    price = df_final.pop("price_hs6").combine_first(price)
    assert not any(price.isna())

    # reconstruct FOB as volume * price
    return df_final.assign(fob=df_final["vol"] * price)


def replace_nan_with_unknown_value(df_final):
    df_final = df_final.fillna(
        {column: unknowns[0] for column, unknowns in UNKNOWNS.items()}
    )
    assert not df_final.isna().any().any()
    return df_final


def concat(*dfs):
    assert len(set(tuple(sorted(df.columns)) for df in dfs)) == 1
    return pd.concat(dfs, sort=False, ignore_index=True)


def pad_with_remaining_bol(df_solved, df_bol_remaining, has_hs8):
    columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
    if not has_hs8:
        columns_to_keep_from_bol.remove("hs8")

    df_padding = df_bol_remaining[[*columns_to_keep_from_bol, "vol"]]
    df_padding = df_padding.rename(columns={"vol": "vol_bol"})
    df_padding = df_padding.assign(
        **{column: UNKNOWNS[column][0] for column in COLUMNS_TO_KEEP_FROM_MDIC}
    )
    df_padding = df_padding.assign(
        matched=False,
        matching_stage="N/A",
        index_mdic=-1,
        index_bol=df_padding.index,
        vol_mdic=0,
    )
    df_final = concat(df_solved.assign(matched=True), df_padding)
    df_final = replace_nan_with_unknown_value(df_final)
    df_final = df_final.rename(columns={"vol_bol": "vol"}, errors="raise")
    return df_final


def pad_to_mdic_volumes(df_final, df_bol_remaining_, df_mdic_remaining_):
    """
    At this point our dataframe (df_final) is equal to the BoL dataset. In practice the
    the total volume is usually less than MDIC. Since MDIC represents official
    government statistics, we would like to "pad" our dataframe with rows to bring up
    the volume.

    These padding rows will of course contain largely unknowns. We try our best to at
    least preserve the HS6 column. There is one edge case where we can use the unmatched
    rows from MDIC directly.

    For simplicity we use the remaining BoL and MDIC under the following assumptions:

        df_final = bol matched  + bol remaining
        df_mdic  = mdic matched + mdic remaining
        bol matched = mdic matched

    And therefore that

        df_mdic - df_final = mdic remaining - bol remaining

    This assumption doesn't quite hold since (a) volumes in MDIC and BoL are different
    and (b) at the time of writing, this function ran after discard_tiny_flows. However
    it's close enough!
    """

    print("Padding to MDIC volume")
    # unknowns assumed to be filled
    assert not df_final.isna().any().any()
    assert not df_bol_remaining_.isna().any().any()
    assert not df_mdic_remaining_.isna().any().any()

    # gather all HS codes
    df_hs_ = pd.concat(
        [
            df_final[["hs4", "hs6"]],
            df_bol_remaining_[["hs4", "hs6"]],
            df_mdic_remaining_[["hs4", "hs6"]],
        ]
    ).drop_duplicates()
    unknown_hs6 = UNKNOWNS["hs6"][0]
    unknown_hs4 = UNKNOWNS["hs4"][0]
    padding_dataframes = []
    for hs4, df_hs in df_hs_.groupby("hs4"):
        if hs4 == unknown_hs4:
            continue

        df = df_bol_remaining_[df_bol_remaining_["hs4"] == hs4]

        # look to see if there is significant volume in an unknown HS6 code. If this
        # is the case then padding per HS6 is not really possible to do. For example,
        # suppose we have the following:
        #
        #    df_bol_remaining:        df_mdic_remaining:
        #
        #    | hs6    | vol |         | hs6    | vol |
        #    |--------|-----|         |--------|-----|
        #    | 0102XX |  50 |         | 010200 |  50 |
        #                             | 010299 |  50 |
        #
        # We know that df_final needs 50 volume, but we don't know which HS6 codes are
        # already "part" of the unknown 0102XX code. The best we can do is to add 50
        # tons to 0102XX, i.e. pad to an HS4 level
        volume_in_unknown_hs6 = df[df["hs6"] == unknown_hs6]["vol"].sum()
        total_volume = df["vol"].sum()
        p = volume_in_unknown_hs6 / total_volume
        if p > 0.1:  # i.e. over 10% of total volume
            print(f"\tNot padding {hs4} as unknown HS6 is {100 * p:.0f}% of volume")
            # TODO consider padding to HS4 level
            continue

        for hs6 in df_hs["hs6"]:
            if hs6 == unknown_hs6:
                continue

            df_bol_remaining = df_bol_remaining_[df_bol_remaining_["hs6"] == hs6]
            df_mdic_remaining = df_mdic_remaining_[df_mdic_remaining_["hs6"] == hs6]

            mdic_volume = df_mdic_remaining["vol"].sum()
            bol_volume = df_bol_remaining["vol"].sum()
            if bol_volume > mdic_volume:
                with np.errstate(divide="ignore"):
                    p = 100 * np.divide(bol_volume - mdic_volume, mdic_volume)
                print(f"\tWarning: BoL volume for {hs6} exceeds MDIC by {p:,.1f}%")
                continue

            # if this HS6 code is entirely missing from df_bol_remaining then we can
            # simply pad using the MDIC data
            if df_bol_remaining.empty:
                df_padding = df_mdic_remaining.assign(
                    index_bol=-1,
                    index_mdic=df_mdic_remaining.index,
                    matched=False,
                    matching_stage="Padding",
                    vol_mdic=df_mdic_remaining["vol"],
                )

            # otherwise we can only pad with unknowns, since we do not know which rows
            # from the unmatched MDIC are present in the unmatched BoL
            else:
                missing_volume = mdic_volume - bol_volume
                df_padding = pd.DataFrame(
                    [
                        dict(
                            hs4=hs6[0:4],
                            hs5=hs6[0:5],
                            hs6=hs6,
                            index_bol=-1,
                            index_mdic=-1,
                            matched=False,
                            matching_stage="Padding",
                            vol=missing_volume,
                            vol_mdic=missing_volume,
                        )
                    ]
                )

            # fill any missing columns with unknowns
            missing_columns = set(df_final.columns) - set(df_padding.columns)
            df_padding = df_padding.assign(
                **{column: UNKNOWNS[column][0] for column in missing_columns}
            )

            # discard any extra columns
            df_padding = df_padding[df_final.columns]

            # add to list of padding dataframes
            padding_dataframes.append(df_padding)

    return pd.concat([df_final, *padding_dataframes], sort=False)


def discard_tiny_flows(df_final):
    """
    Discard flows smaller than 150 kg
    """
    # threshold_options = [1] + list(range(10, 251, 10))
    # flows_discarded = []
    # vol_discarded = []
    # for threshold in threshold_options:
    #     print(f"When Threshold is {threshold}")
    #     # discard flows which are less than 10 kg
    #     is_tiny = df_final["vol"] < threshold
    #     df_tiny = df_final[is_tiny]
    #     flows_discarded.append(100 * sum(is_tiny) / len(is_tiny))
    #     vol_discarded.append(100 * df_tiny["vol"].sum() / df_final["vol"].sum())
    #     print(
    #         f"Discarding {100 * sum(is_tiny) / len(is_tiny):.1f}% of rows: these were "
    #         f"with less than {threshold} kg. This represents "
    #         f"{100 * df_tiny['vol'].sum() / df_final['vol'].sum():.1f}% of volume)"
    #     )
    #
    # import matplotlib.pyplot as plt
    # import seaborn as sns
    #
    # sns.set()
    # # plt.title("Selecting threshold: Proportion of rows discarded")
    # plt.plot(threshold_options, flows_discarded)
    # # plt.xlabel("Threshold")
    # # plt.ylabel("% Rows Discarded")
    # plt.show()
    #
    # sns.set()
    # # plt.title("Selecting threshold: Proportion of volume discarded")
    # plt.plot(threshold_options, vol_discarded)
    # # plt.xlabel("Threshold")
    # # plt.ylabel("% Volume Discarded")
    # plt.show()

    is_tiny = df_final["vol"] < 150
    df_tiny = df_final[is_tiny]
    print(
        f"Discarding {100 * sum(is_tiny) / len(is_tiny):.1f}% of rows: these were "
        f"with less than 150 kg. This represents "
        f"{100 * df_tiny['vol'].sum() / df_final['vol'].sum():.1f}% of volume)"
    )
    return df_final[~is_tiny]


def replace_unknowns_with_nan(df, has_hs8):
    if has_hs8:
        df["hs8"] = df["hs8"].mask(df["hs8"].str.endswith("X"), "X" * 8)
    df["hs6"] = df["hs6"].mask(df["hs6"].str.endswith("X"), "X" * 6)
    df["hs5"] = df["hs5"].mask(df["hs5"].str.endswith("X"), "X" * 5)

    if has_hs8:
        assert all((df["hs8"] == UNKNOWNS["hs8"][0]) | df["hs8"].str.isdigit())
    assert all((df["hs6"] == UNKNOWNS["hs6"][0]) | df["hs6"].str.isdigit())
    assert all((df["hs5"] == UNKNOWNS["hs5"][0]) | df["hs5"].str.isdigit())
    assert all(df["hs4"].str.isdigit())
    return df.replace(
        {column: {value: None for value in UNKNOWNS[column]} for column in UNKNOWNS}
    )


def set_index(df, prefix):
    df.index = pd.Index((f"{prefix}{i}" for i in range(len(df))), name="index")


def clean_bol(df_secomex, df_bol, year, has_municipality, has_hs8):
    # From mid 2021, SECOMEX is not available
    if year <= 2020:
        df_secomex = df_secomex[["cnpj", "municipality.trase_id"]]
        df_secomex = df_secomex.rename(
            columns={
                "municipality.trase_id": "exporter.municipality.trase_id",
                "cnpj": "exporter.cnpj",
            },
            errors="raise",
        )
        df_secomex = df_secomex.drop_duplicates()

    # load BoL
    # Rename columns for 2023 onwards to match expected names
    if year >= 2023:
        df_bol = df_bol.rename(
            columns={
                "exporter_cnpj": "exporter.cnpj",
                "exporter_label": "exporter.label",
                "exporter_municipality_trase_id": "exporter.municipality.trase_id",
                "exporter_type": "exporter.type",
                "port_of_export_name": "port_of_export.name",
                "importer_label": "importer.label",
                "country_of_destination_label": "country_of_destination.label",
                "country_of_destination_name": "country_of_destination.name",
                "country_of_destination_trase_id": "country_of_destination.trase_id",
                "net_weight_kg": "vol",
            },
            errors="raise",
        )

    df_bol = df_bol[
        [
            "hs4",
            "hs6",
            "hs5",
            *(["hs8"] if has_hs8 else []),
            "month",
            "exporter.cnpj",
            "exporter.label",
            *(["exporter.municipality.trase_id"] if has_municipality else []),
            "exporter.type",
            "port_of_export.name",
            "importer.label",
            "country_of_destination.label",
            "country_of_destination.name",
            "country_of_destination.trase_id",
            "vol",
        ]
    ]
    df_bol = df_bol[df_bol["hs4"].isin(BEEF_HS4)]
    df_bol = df_bol.astype({"vol": float, "month": int})

    if year == 2018:
        month = df_bol["month"].astype(int)
        assert sorted(month.unique()) == list(range(1, 13))
        df_bol = df_bol[month >= 5]

    # Get the municipality from SECOMEX until 2020
    if year <= 2020:
        df_bol = pd.merge(
            df_bol,
            df_secomex,
            on="exporter.cnpj",
            validate="many_to_one",
            how="left",
            indicator=True,
            suffixes=("", "_secomex"),
        )
        matched = df_bol.pop("_merge") == "both"
        print(
            f"Matched BoL with SECOMEX for {100 * sum(matched) / len(matched):.1f}% of rows"
            f" / {100 * df_bol[matched]['vol'].sum() / df_bol['vol'].sum():.1f}% of volume"
        )

        if has_municipality:
            # take municipality from SECOMEX where it was found, otherwise fall back on
            # the BOL.
            df_bol["exporter.municipality.trase_id"] = np.where(
                matched,
                df_bol.pop("exporter.municipality.trase_id_secomex"),
                df_bol.pop("exporter.municipality.trase_id"),
            )

    # Raise an error if year >= 2021 and municipality is not available.
    # At the moment (update up to 2023) there are no cases - if there are in the future,
    # we could take the municipality from the CNPJ database
    if year >= 2021 and not has_municipality:
        raise ValueError(
            "Municipality is not available for year >= 2021 in the BOL or SECOMEX. "
            "Adjust the logic to take this from RFB's CNPJ reference data."
        )

    # filling missing municipality with unknown
    df_bol = df_bol.fillna({"exporter.municipality.trase_id": "BR-XXXXXXX"})

    df_bol["port_of_export.group"] = df_bol["port_of_export.name"].apply(
        lambda port: PORT_GROUPS.get(port, port)
    )
    df_bol["country_of_destination.group"] = df_bol[
        "country_of_destination.name"
    ].apply(lambda country: COUNTRY_GROUPS.get(country, country))

    # consider BR-51XXXXX etc. to be unknown
    df_bol["exporter.municipality.trase_id"] = df_bol[
        "exporter.municipality.trase_id"
    ].mask(df_bol["exporter.municipality.trase_id"].str.contains("X"), "BR-XXXXXXX")

    # fill unknowns with nan - important to exclude from matching
    df_bol = replace_unknowns_with_nan(df_bol, has_hs8)

    # all trade is maritime
    df_bol["via"] = "01"

    # we set a custom and unique index on each dataset: this means that they won't
    # overlap with MDIC and so bugs will be more obvious
    set_index(df_bol, f"bol-")
    return df_bol


def clean_mdic(df_mdic, year, has_hs8):
    if year == 2018:
        month = df_mdic["month"].astype(int)
        assert sorted(month.unique()) == list(range(5, 13))
        df_mdic = df_mdic[month >= 5]

    df_mdic["hs5"] = df_mdic["hs6"].str.slice(0, 5)

    # just load one HS4 code for now so that we're working with a smaller amount of data
    df_mdic = df_mdic[df_mdic["hs4"].isin(BEEF_HS4)]
    df_mdic = rename(df_mdic, {"port.name": "port_of_export.name"})
    df_mdic = df_mdic.astype({"vol": "float", "month": "int"})

    df_mdic = df_mdic[df_mdic["vol"] > 1]

    # align some port names
    df_mdic["port_of_export.group"] = df_mdic["port_of_export.name"].apply(
        lambda country: PORT_GROUPS.get(country, country)
    )
    df_mdic["country_of_destination.group"] = df_mdic[
        "country_of_destination.name"
    ].apply(lambda country: COUNTRY_GROUPS.get(country, country))

    # fill unknowns with nan - important to exclude from matching
    df_mdic = replace_unknowns_with_nan(df_mdic, has_hs8)

    # we set a custom and unique index on each dataset: this means that they won't
    # overlap with BoL and so bugs will be more obvious
    set_index(df_mdic, f"mdic-")
    return df_mdic


def full_merge(*args, **kwargs):
    df = pd.merge(*args, indicator=True, **kwargs)
    assert all(df.pop("_merge") == "both")
    return df


def add_bol_and_mdic_columns(df_solved, df_bol, df_mdic, has_hs8):
    """
    At this point, `df_solved` looks like this:

    ```nohighlight
    | index_mdic | index_bol | vol_bol | vol_mdic |
    |------------|-----------|---------|----------|
    |     mdic-7 | bol-10512 |   11557 |    11557 |
    |   mdic-132 |  bol-1831 |   25074 |    25074 |
    |   mdic-177 |  bol-2281 |    2846 |     2846 |
    |       ...  |       ... |     ... |      ... |
    ```

    We merge back in the original columns from MDIC and BoL using the indexes.
    """

    def left_merge(df_left, df_right, left_on):
        return full_merge(
            df_left,
            df_right,
            how="left",
            left_on=left_on,
            right_index=True,
            validate="many_to_one",
        )

    # bring in the state of origin from MDIC
    df_solved = left_merge(df_solved, df_mdic[COLUMNS_TO_KEEP_FROM_MDIC], "index_mdic")

    # bring in all other columns from the BoL
    columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
    if not has_hs8:
        columns_to_keep_from_bol.remove("hs8")

    df_solved = left_merge(
        df_solved,
        df_bol[columns_to_keep_from_bol],
        "index_bol",
    )
    return df_solved


def subtract_maximum_matched_volume(df_matches, df_bol, df_mdic):
    def subtract(df, index, volume):
        df = pd.merge(
            df,
            df_matches.groupby(index)[volume].sum(),
            left_index=True,
            right_index=True,
            how="left",
            validate="one_to_one",
        )
        df["vol"] -= df.pop(volume).fillna(0)
        return df[df["vol"] > 0]

    df_bol_remaining = subtract(df_bol, "index_bol", "vol_bol")
    df_mdic_remaining = subtract(df_mdic, "index_mdic", "vol_mdic")
    return df_matches, df_bol_remaining, df_mdic_remaining


def run_matching(indexer, df_bol, df_mdic):
    # construct a list of matching pairs
    pairs = indexer.index(df_mdic, df_bol)
    if pairs.get_level_values(0).size > 0:
        # fairly distribute volume which can be assigned among these pairs,
        allocation_mdic, allocation_bol = direct_sourcing(
            pairs,
            df_mdic.loc[pairs.get_level_values(0).drop_duplicates()]["vol"],
            df_bol.loc[pairs.get_level_values(1).drop_duplicates()]["vol"],
            relative_tolerance=0.1,  # allow for 10% reduction in BoL volumes
        )

        # subtract the solved volume from the original data
        df_matches = pd.DataFrame(
            {
                "index_mdic": allocation_mdic.index.get_level_values(0),
                "index_bol": allocation_bol.index.get_level_values(1),
                "vol_bol": allocation_bol,
                "vol_mdic": allocation_mdic,
            }
        )
        (
            df_matches,
            df_bol_remaining,
            df_mdic_remaining,
        ) = subtract_maximum_matched_volume(df_matches, df_bol, df_mdic)
    else:
        df_matches = pd.DataFrame(
            {
                "index_mdic": pd.Series(dtype="str"),
                "index_bol": pd.Series(dtype="int"),
                "vol_bol": pd.Series(dtype="float"),
                "vol_mdic": pd.Series(dtype="float"),
            }
        )
        df_bol_remaining = df_bol
        df_mdic_remaining = df_mdic
    return df_matches, df_bol_remaining, df_mdic_remaining


def step1(df_bol, df_mdic, has_hs8):
    import recordlinkage as rl

    indexer = rl.Index()
    indexer.block(
        [
            "country_of_destination.name",
            "exporter.municipality.trase_id",
            "hs4",
            # note: HS5 and HS6 not included here because they can be unknown in the
            # sense that they are not official UN codes; yet HS8 (a Brazilian government
            # construct) may match
            *(["hs8"] if has_hs8 else []),
            "month",
            "port_of_export.group",
            "via",
        ]
    )
    return run_matching(indexer, df_bol, df_mdic)


def step2(df_bol, df_mdic):
    import recordlinkage as rl

    indexer = rl.Index()
    indexer.sortedneighbourhood(
        "month",
        window=3,
        block_on=[
            "country_of_destination.group",
            "exporter.municipality.trase_id",
            "hs4",
            "hs6",
            "port_of_export.group",
            "via",
        ],
    )
    return run_matching(indexer, df_bol, df_mdic)


def step3(df_bol, df_mdic):
    import recordlinkage as rl

    indexer = rl.Index()
    indexer.block(
        [
            "country_of_destination.group",
            "exporter.municipality.trase_id",
            "hs4",
            "port_of_export.group",
            "via",
        ],
    )
    return run_matching(indexer, df_bol, df_mdic)


def assert_equality_with_bol(df_final, df_bol, has_hs8):
    columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
    if not has_hs8:
        columns_to_keep_from_bol.remove("hs8")

    # quick check that volume was conserved
    assert np.isclose(df_final["vol"].sum(), df_bol["vol"].sum(), rtol=0.01)

    # check no difference with all original BOL columns
    difference = compare_dataframes_single(
        df_final,
        df_bol,
        "vol",
        columns_to_keep_from_bol,
        Compare.signed_symmetric_relative_error,
    )
    assert all(np.isclose(0, difference["comparison"], atol=0.01))


def clean_string_columns(df, column_list):
    """
    Clean the string columns by replacing the missing values and adjusting formats
    :param df: dataframe
    :param column_list: list of column names, list(str)
    :return: df_2: cleaned dataframe
    """

    # clean the string columns
    for column in column_list:
        df[column] = df[column].apply(clean_string)

    missing_value_list = ["NAN", "NONE", "NA", "", "NOT DECLARED"]

    # replace null values to UNKNOWN
    for column in df.columns:
        df.loc[df[column].isin(missing_value_list), column] = "UNKNOWN"

    return df


def get_state_name(df_state, df):
    df_state = df_state[df_state["region_type"] == "STATE"]
    df_state = df_state[df_state["trase_id"].str.startswith("BR")]
    df_state = df_state.rename(
        columns={
            "name": "state_of_production",
            "trase_id": "state.trase_id",
        },
        errors="raise",
    )
    df_state = df_state[["state_of_production", "state.trase_id"]]
    df_state = df_state.drop_duplicates()

    df = full_merge(
        df,
        df_state,
        how="left",
        on="state.trase_id",
        validate="many_to_one",
    )
    return df

    # print("Running for 2019\n" + ("=" * 80))
    # run(year=2019, has_hs8=True, has_municipality=False)

    # print("Running for 2020\n" + ("=" * 80))
    # run(year=2020, has_hs8=False, has_municipality=True)


def process(
    df_bol,
    df_mdic,
    df_mdic_port,
    df_state,
    year,
    has_municipality,
    has_hs8,
    df_secomex=None,
):

    df_bol = clean_bol(df_secomex, df_bol, year, has_municipality, has_hs8)
    df_mdic = clean_mdic(df_mdic, year, has_hs8)

    vol_bol = df_bol["vol"].sum()
    if year == 2018:
        vol_mdic = pd.merge(df_mdic, df_bol["hs4"].drop_duplicates(), on="hs4")[
            "vol"
        ].sum()
    else:
        vol_mdic = pd.merge(df_mdic, df_bol["hs6"].drop_duplicates(), on="hs6")[
            "vol"
        ].sum()

    def report(prefix, df):
        print(
            f"{prefix}: solved {100 * df['vol_bol'].sum() / vol_bol:.1f}% of BoL volume; "
            f"{100 * df['vol_mdic'].sum() / vol_mdic:.1f}% of MDIC volume"
        )

    dfs_solved = []

    # matching step 1
    # ----------------------------------------------------------------------------------
    df, df_bol_remaining, df_mdic_remaining = step1(df_bol, df_mdic, has_hs8)
    dfs_solved.append(
        df.assign(
            matching_stage=f"1 - country, exporter municipality, {'hs8, ' if has_hs8 else ''}port, month"
        )
    )
    report("Step 1", df)

    # matching step 2
    # ----------------------------------------------------------------------------------
    df, df_bol_remaining, df_mdic_remaining = step2(df_bol_remaining, df_mdic_remaining)
    dfs_solved.append(
        df.assign(
            matching_stage="2 - country, exporter municipality, hs6, port, within three months"
        )
    )
    report("Step 2", df)

    # matching step 3
    # ----------------------------------------------------------------------------------
    df, df_bol_remaining, df_mdic_remaining = step3(df_bol_remaining, df_mdic_remaining)
    dfs_solved.append(
        df.assign(matching_stage="3 - country, exporter municipality, hs4, port")
    )
    report("Step 3", df)

    # bring together all matching steps
    # ----------------------------------------------------------------------------------
    print("Combining steps")
    df_solved = concat(*dfs_solved)
    df_solved = add_bol_and_mdic_columns(df_solved, df_bol, df_mdic, has_hs8)

    df_final = pad_with_remaining_bol(df_solved, df_bol_remaining, has_hs8)
    df_bol = replace_nan_with_unknown_value(df_bol)
    assert_equality_with_bol(df_final, df_bol, has_hs8)

    df_bol_remaining = replace_nan_with_unknown_value(df_bol_remaining)
    df_mdic_remaining = replace_nan_with_unknown_value(df_mdic_remaining)
    df_final = pad_to_mdic_volumes(df_final, df_bol_remaining, df_mdic_remaining)
    df_final = discard_tiny_flows(df_final)
    df_final = add_interpolated_fob_values(df_mdic_port, df_final, year)

    # print increase in rows
    columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
    if not has_hs8:
        columns_to_keep_from_bol.remove("hs8")
    a = df_final.groupby([*columns_to_keep_from_bol, *COLUMNS_TO_KEEP_FROM_MDIC])
    b = df_bol.groupby(columns_to_keep_from_bol)
    print(f"BOL has {(len(a) / len(b)):.1f}x as many rows as before")

    # clean strings and state names
    # ----------------------------------------------------------------------------------
    df_final = df_final.astype(str)
    string_columns = [
        c for c in list(df_final.columns) if c not in ["vol", "vol_mdic", "fob"]
    ]
    df_final = clean_string_columns(df_final, string_columns)
    df_final["year"] = year
    df_final["exporter_geocode"] = df_final["exporter.municipality.trase_id"].str[-7:]
    df_final = get_state_name(df_state, df_final)

    # drop redundant column
    return df_final.drop("success", axis=1)


def run(year, has_hs8, has_municipality):
    S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()

    df_state = read_s3_parquet("postgres_views/postgres_regions.parquet")

    # Secomex is only available until first months of 2021
    if year <= 2020:
        df_secomex = get_pandas_df_once(
            f"brazil/auxiliary/secex/cleaned/EMPRESAS_CADASTRO_{year}.csv",
            sep=";",
            encoding="utf-8",
            keep_default_na=False,
            dtype=str,
        )
    else:
        df_secomex = None

    df_bol = get_pandas_df_once(
        f"brazil/trade/bol/{year}/BRAZIL_BOL_{year}.csv",
        sep=";",
        encoding="utf-8",
        keep_default_na=False,
        dtype=str,
    )
    if year == 2018:
        df_mdic = get_pandas_df_once(
            f"brazil/trade/mdic/disaggregated/brazil_mdic_disaggregated_{year}_beef_02.csv",
            sep=";",
            encoding="utf-8",
            keep_default_na=False,
            dtype="str",
        )
    else:
        df_mdic = get_pandas_df_once(
            f"brazil/trade/mdic/disaggregated/brazil_mdic_disaggregated_{year}_beef.csv",
            sep=";",
            encoding="utf-8",
            keep_default_na=False,
            dtype="str",
        )

    df_mdic_port = get_pandas_df_once(
        f"brazil/trade/mdic/port/brazil_mdic_port_{year}.csv",
        sep=";",
        encoding="utf-8",
        keep_default_na=False,
        dtype="str",
    )

    df_final = process(
        df_bol,
        df_mdic,
        df_mdic_port,
        df_state,
        year,
        has_municipality,
        has_hs8,
        df_secomex,
    )

    # write to disk
    write_csv_for_upload(
        df_final,
        f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_02.csv",
    )


def main():
    print("Running for 2018\n" + ("=" * 80))
    run(year=2018, has_hs8=True, has_municipality=False)


if __name__ == "__main__":
    main()
from trase.data.brazil.beef.trade.cd.disaggregated.CD_DISAGGREGATED_BEEF_201X import (
    process,
)

YEAR = 2023


def model(dbt, cursor):
    dbt.config(materialized="external")

    df_bol = dbt.ref("brazil_bol_2023_gold").df()
    df_mdic = dbt.ref("brazil_mdic_disaggregated_2023_beef").df()
    df_mdic_port = dbt.ref("brazil_mdic_port_2023").df()
    df_state = dbt.ref("postgres_regions_without_geometry").df()

    return process(
        df_bol,
        df_mdic,
        df_mdic_port,
        df_state,
        year=YEAR,
        has_municipality=True,
        has_hs8=True,
    )