Skip to content

DBT: Brazil Bol 2023 Mdic Padded

File location: s3://trase-storage/brazil/trade/bol/2023/brazil_bol_2023_mdic_padded.parquet

DBT model name: brazil_bol_2023_mdic_padded

Explore on Metabase: Full table; summary statistics

Explore dependencies/lineage: link

Relies on script: trase/data/brazil/trade/padding/brazil_bol_mdic_padding_20XX.py


Description

Padded version of the MDIC codes in Brazil BOL 2023 data. Includes non-maritime volumes and FOB based on MDIC, as well as from non-existing hs6-destination country combinations on the BOL. Other values (exporter, port, etc) receive default UNKNOWN values. In the 2023 version, for any hs6 code that already has 'EUROPEAN UNION' as a destination country name, we filter out padded rows destined for EU member countries, to avoid over-representing EU volumes. At the time of implementation, this primarily affected hs6 codes such as 230400 and 120190. In the 2023 version, as some records within hs6's 230400, 120100 have 'EUROPEAN UNION' as a destination country, we are filtering out the padding of EU countries, so to not over-represent them. Adds fields: padded (bool) and padding_type (string): ['non-maritime padding', 'hs6 country padding'].


Details

Column Type Description
commodity VARCHAR Uppercase name of the commodity (BEEF, SOY, etc.) based on the hs4 code, and the list of commodities in the Trase reference table postgres_commodities
country_of_destination_economic_bloc VARCHAR If the country of destination is part of the EU for the year of the BOL, it sets the value "EUROPEAN UNION" here. Note that currently no other economic blocs are being identified. If this is done in the future, probably the field will turn into an array.
country_of_destination_label VARCHAR Based on the Datamar source field PLACE_AND_PORTS_DEST_COUNTRY: "Country where the carrier delivered the cargo"
country_of_destination_name VARCHAR Official Trase country name where the export was delivered. It is cleaned based on country_of_destination_label field Note that European Union countries are not unified as 'EUROPEAN UNION' country, although the source BOL does has 768 records with the label "EUROPEAN" instead of a specific country.
country_of_destination_trase_id VARCHAR Official Trase country id, using the iso two letter country code based on the field country_of_destination_name
date DATE Based on the source field DATES_LONG_HAUL_YYYYMMDD: Berthing (arrival) date of the long haul vessel: Year, month and day
year INTEGER
month INTEGER
exporter_cnpj VARCHAR Based on the source field COMPANY_SHIPPER_REGISTRATION_NUMBER (the shipper is the exporter), which already only contains digits. The cleaning of this field uses stdnum.br python library to identify if its a valid cnpj, cpf, or unkown (and sets this in the exporter_type field). If its a valid cnpj it left pads with 0 until it reaches 14 characters, and if its a cpf left pads to 11 characters.
exporter_country_label VARCHAR Based on the source field PLACE_AND_PORTS_POL_COUNTRY: country name of the exporter
exporter_group VARCHAR Official Trase trader group name, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label. As a trader can change its group affiliation in time, the group valid for the current BOL year is taken.
exporter_label VARCHAR Based on the source field COMPANY_SHIPPER_SHIPPER_NAME_DETAILED: Detailed Shipper (exporter) name.
exporter_municipality_label VARCHAR Based on the source field COMPANY_SHIPPER_CITY: Shipper (exporter) city
exporter_municipality_name VARCHAR Official Trase name of the municipality, based on the exporter_municipality_label, and cleaning it based on the postgres_regions reference table - where it checks the municipality exists and takes the official name
exporter_municipality_trase_id VARCHAR Official Trase id of the municipality, usually created based on an official geocode of it. It is based on the exporter_municipality_label, and cleaning it based on the postgres_regions reference table - where it checks the municipality exists and takes the id
exporter_name VARCHAR Official Trase trader name, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label field
exporter_node_id BIGINT
exporter_state_label VARCHAR Based on the source field COMPANY_SHIPPER_STATE_NAME: Shipper (exporter) state name
exporter_state_name VARCHAR Official Trase name of the state, based on the exporter_state_label, and cleaning it based on the postgres_regions reference table - where it checks the state exists and takes the official name
exporter_state_trase_id VARCHAR Official Trase id of the state, based on the exporter_state_label, and cleaning it based on the postgres_regions reference table - where it checks the state exists and takes the official id
exporter_trase_id VARCHAR Official Trase trader id, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label field. The trase id is built based on the tax number.
exporter_type VARCHAR Based on the field exporter_cnpj, it identifies if its a valid cnpj, cpf or if its of unknown type.
fob DOUBLE Based on the field fob_original, including imputing empty values based on COMTRADE or BOL average values for the hs6 code. Where imputed, the field fob_adjustment_type shows which kind of imputing was done.
fob_adjustment_type VARCHAR
fob_original DOUBLE Based on the source field MEASURES_SECEX_FOB. Though this field doesn't exist in the vendor glossary, there is a similar field in the glossary called MEASURES_FOB_VALUE_USD: Monthly average FOB value per commodity in US dollars.
hs4 VARCHAR
hs5 VARCHAR
hs6 VARCHAR
hs6_description VARCHAR
hs8 VARCHAR
importer_group VARCHAR
importer_label VARCHAR
importer_name VARCHAR
importer_node_id BIGINT
net_weight_kg DOUBLE Based on the source field MEASURES_WTKG: Weight in kilos
net_weight_tonnes DOUBLE Based on the source field MEASURES_WTMT: Weight in metric tonnes
port_of_export_label VARCHAR Based on the source field PLACE_AND_PORTS_POL_NAME: POL ‐ PORT OF LOADING ‐ Port where the LONG HAUL vessel loaded the cargo
port_of_export_name VARCHAR Official Trase port name where the cargo was loaded. It is cleaned based on port_of_export_label field and the postgres_ports reference table, checking also the port exists in the port_of_export_country
port_of_import_country_label VARCHAR Based on the source field PLACE_AND_PORTS_POD_COUNTRY: POD ‐ PORT OF DISCHARGE ‐ Country where the LONG HAUL vessel discharged the cargo
port_of_import_country_name VARCHAR
port_of_import_label VARCHAR Based on the source field PLACE_AND_PORTS_POD_NAME: POD ‐ PORT OF DISCHARGE ‐ Port where the LONG HAUL vessel discharged the cargo
port_of_import_name VARCHAR Official Trase port name where the cargo was discharged. If there is no current existing port with that name in the port_of_import_country, it returns a NULL. It is cleaned based on port_of_import_label field and the postgres_ports reference table.
mode_of_transportation VARCHAR Mode of transportation, such as MARITIMA, FLUVIAL, RODOVIARIA, AEREA.
padded BOOLEAN Boolean. Whether the trade record was filled in using general MDIC port data
padding_type VARCHAR For the moment including: - non-maritime padding: whether there was volume and fob padding using MDIC non-maritime data - hs6 country padding: whether the whole hs6-destination country volume and fob was padded
maritime_non_maritime VARCHAR

Review full report including sample errors records if they exist (link)

Test name Test column Last test run Last status
accepted_values_brazil_bol_2023_mdic_padded_country_of_destination_economic_bloc__EUROPEAN_UNION country_of_destination_economic_bloc 2026-04-25 13:23 pass
accepted_values_brazil_bol_2023_mdic_padded_exporter_country_label__BRAZIL exporter_country_label 2026-04-25 13:23 pass
accepted_values_brazil_bol_2023_mdic_padded_exporter_type__cnpj__cpf__unknown exporter_type 2026-04-25 13:23 pass
dbt_expectations_expect_table_aggregation_to_equal_other_table_brazil_bol_2023_mdic_padded_source_source_brazil_original_brazil_soy_beef_bol_2023___count___0_03 `` 2026-04-25 13:23 pass
dbt_utils_accepted_range_brazil_bol_2023_mdic_padded_date__CAST_2023_12_31_AS_DATE___CAST_2023_01_01_AS_DATE_ date 2026-04-25 13:23 pass
dbt_utils_accepted_range_brazil_bol_2023_mdic_padded_fob__700000000__0_01 fob 2026-04-25 13:23 pass
dbt_utils_accepted_range_brazil_bol_2023_mdic_padded_net_weight_kg__True__120000000__1 net_weight_kg 2026-04-25 13:23 pass
dbt_utils_accepted_range_brazil_bol_2023_mdic_padded_net_weight_tonnes__100000000__0_0001 net_weight_tonnes 2026-04-25 13:23 pass
dbt_utils_expression_is_true_brazil_bol_2023_mdic_padded_ABS_net_weight_kg_1_0_net_weight_tonnes_1000_5 `` 2026-04-25 13:23 pass
dbt_utils_not_null_proportion_brazil_bol_2023_mdic_padded_0_8__port_of_import_name port_of_import_name 2026-04-25 13:23 pass
dbt_utils_not_null_proportion_brazil_bol_2023_mdic_padded_0_99__exporter_state_label exporter_state_label 2026-04-25 13:23 pass
dbt_utils_not_null_proportion_brazil_bol_2023_mdic_padded_0_99__exporter_trase_id exporter_trase_id 2026-04-25 13:23 pass
dbt_utils_relationships_where_brazil_bol_2023_mdic_padded_exporter_municipality_name__name__ref_postgres_regions_without_geometry___level_6_AND_country_BRAZIL_ exporter_municipality_name 2026-04-25 13:23 pass
dbt_utils_relationships_where_brazil_bol_2023_mdic_padded_exporter_municipality_trase_id__trase_id__ref_postgres_regions_without_geometry___level_6_AND_country_BRAZIL_ exporter_municipality_trase_id 2026-04-25 13:23 pass
dbt_utils_relationships_where_brazil_bol_2023_mdic_padded_exporter_state_name__name__ref_postgres_regions_without_geometry___level_3_AND_country_BRAZIL_ exporter_state_name 2026-04-25 13:23 pass
dbt_utils_relationships_where_brazil_bol_2023_mdic_padded_exporter_state_trase_id__trase_id__ref_postgres_regions_without_geometry___level_3_AND_country_BRAZIL_ exporter_state_trase_id 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_commodity commodity 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_country_of_destination_name country_of_destination_name 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_country_of_destination_trase_id country_of_destination_trase_id 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_cnpj exporter_cnpj 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_country_label exporter_country_label 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_group exporter_group 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_label exporter_label 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_municipality_label exporter_municipality_label 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_municipality_name exporter_municipality_name 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_municipality_trase_id exporter_municipality_trase_id 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_name exporter_name 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_node_id exporter_node_id 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_state_name exporter_state_name 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_state_trase_id exporter_state_trase_id 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_exporter_type exporter_type 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_fob fob 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_hs4 hs4 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_net_weight_kg net_weight_kg 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_net_weight_tonnes net_weight_tonnes 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_port_of_export_label port_of_export_label 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_port_of_export_name port_of_export_name 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_port_of_import_country_label port_of_import_country_label 2026-04-25 13:23 pass
not_null_brazil_bol_2023_mdic_padded_port_of_import_label port_of_import_label 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_commodity__commodity__ref_postgres_commodities_ commodity 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_country_of_destination_name__country_name__ref_postgres_countries_ country_of_destination_name 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_country_of_destination_trase_id__country_trase_id__ref_postgres_countries_ country_of_destination_trase_id 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_exporter_name__name__ref_postgres_traders_ exporter_name 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_exporter_node_id__trader_node_id__ref_postgres_traders_ exporter_node_id 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_exporter_trase_id__trase_id__ref_postgres_traders_ exporter_trase_id 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_hs4__code__ref_hs2017_ hs4 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_importer_name__name__ref_postgres_traders_ importer_name 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_importer_node_id__trader_node_id__ref_postgres_traders_ importer_node_id 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_port_of_export_name__name__ref_postgres_ports_ port_of_export_name 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_port_of_import_country_name__country_name__ref_postgres_countries_ port_of_import_country_name 2026-04-25 13:23 pass
relationships_brazil_bol_2023_mdic_padded_port_of_import_name__name__ref_postgres_ports_ port_of_import_name 2026-04-25 13:23 pass

Not referenced by any model or exposure.

"""
Padding of Brazil BOL using MDIC trade data. Intended to be called through dbt

Currently only padding non-maritime exports. A previous version of this code included
also some hs6 remapping, whole-country padding and maritime padding, but they are
excluded for the moment.
"""

import polars as pl


def clean_countries_and_economic_blocs(bol_lf, countries_lf, year):
    """
    Adjusts country_of_destination_name and country_of_destination_economic_bloc
    based on country_of_destination_trase_id using the countries reference table.
    """

    ### Get the EU economic bloc countries for the given year
    economic_blocs_lf = (
        countries_lf.select(["country_trase_id", "economic_bloc"])
        .unique()
        .explode("economic_bloc")
    )
    # Turn the json string into a struct with the appropriate types
    dtype = pl.Struct(
        [
            pl.Field("economic_bloc", pl.Utf8),
            pl.Field("time_start", pl.Datetime),
            pl.Field("time_end", pl.Datetime),
        ]
    )
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
    )

    # Flatten the struct into new fields
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
        time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
        time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
    )

    # Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
    economic_blocs_lf = (
        economic_blocs_lf.filter(
            pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
        )
        .filter(pl.col("time_start").dt.year() <= pl.lit(year))
        .filter(
            (pl.col("time_end").is_null())
            | (pl.col("time_end").dt.year() >= pl.lit(year))
        )
    )
    economic_blocs_lf = economic_blocs_lf.select(
        "country_trase_id", "economic_bloc_name"
    )

    # Join back to countries_lf to get the economic_bloc values
    countries_lf = countries_lf.select(["country_trase_id", "country_name"]).unique()
    countries_lf = countries_lf.join(
        economic_blocs_lf,
        left_on="country_trase_id",
        right_on="country_trase_id",
        how="left",
        suffix="_eb",
        validate="1:1",
    ).select(["country_trase_id", "country_name", "economic_bloc_name"])

    # add country_of_destination_name and country_of_destination_economic_bloc to bol_lf
    # for missing country_of_destination_name values
    bol_lf = (
        bol_lf.join(
            countries_lf,
            left_on="country_of_destination_trase_id",
            right_on="country_trase_id",
            how="left",
            suffix="_country",
            validate="m:1",
        )
        .with_columns(
            pl.coalesce(
                [
                    pl.col("country_of_destination_label"),
                    pl.col("country_name"),
                ]
            ).alias("country_of_destination_label"),
            pl.coalesce(
                [
                    pl.col("country_of_destination_name"),
                    pl.col("country_name"),
                ]
            ).alias("country_of_destination_name"),
            pl.coalesce(
                [
                    pl.col("country_of_destination_economic_bloc"),
                    pl.col("economic_bloc_name"),
                ]
            ).alias("country_of_destination_economic_bloc"),
        )
        .drop(["country_name", "economic_bloc_name"])
    )

    return bol_lf


######
#  Supporting functions not used for the moment - though maybe useful in the future
######
# def hs6_codes_remapping(bol_lf, year):
#     """
#     Adjust hs6 codes based on known remappings for a given year.
#     If mapping exists for the year, creates a new hs6_original column with the original hs6 code,
#     and remaps hs6 to the new value based on the mapping.
#     """
#     if year == 2023:
#         bol_lf = bol_lf.with_columns(pl.col("hs6").alias("hs6_original")).with_columns(
#             pl.when(pl.col("hs6") == "150790")
#             .then(pl.lit("150710"))
#             .otherwise(pl.col("hs6"))
#             .alias("hs6")
#         )
#     return bol_lf
#
#


def pad_missing_hs6_country_vols(bol_lf, mdic_port_lf):
    """
    Pad the BOL data with volumes from MDIC hs6/destination countries missing in BOL.
    """
    # Per BOL hs6, find which countries of destination are missing in BOL but present in MDIC
    bol_hs6_country = bol_lf.select(
        ["commodity", "hs6", "country_of_destination_trase_id"]
    ).unique()
    mdic_hs6_country = (
        mdic_port_lf.select(
            [
                "hs6",
                "country_of_destination.trase_id",
                "via_description",
                pl.col("vol").alias("mass_kg"),
                "fob",
            ]
        )
        .group_by(["hs6", "country_of_destination.trase_id", "via_description"])
        .agg(
            pl.col("mass_kg").sum().alias("mass_kg"),
            pl.col("fob").sum().alias("fob"),
        )
    )
    missing_hs6_country_vols = (
        mdic_hs6_country.join(
            bol_hs6_country,
            left_on=["hs6", "country_of_destination.trase_id"],
            right_on=["hs6", "country_of_destination_trase_id"],
            how="anti",
        )
        .join(
            bol_hs6_country.select(["hs6", "commodity"]).unique(),
            on="hs6",
            how="left",
            validate="m:1",
        )
        .select(
            [
                "commodity",
                "hs6",
                "country_of_destination.trase_id",
                "via_description",
                "mass_kg",
                "fob",
            ]
        )
        .sort(["commodity", "mass_kg"], descending=[False, True])
    )
    missing_hs6_country_vols = missing_hs6_country_vols.rename(
        {
            "country_of_destination.trase_id": "country_of_destination_trase_id",
            "mass_kg": "net_weight_kg",
            "via_description": "mode_of_transportation",
        }
    )
    missing_hs6_country_vols = missing_hs6_country_vols.with_columns(
        (pl.col("net_weight_kg") / 1000).alias("net_weight_tonnes"),
        pl.lit(True).alias("padded"),
        pl.lit("hs6 country padding").alias("padding_type"),
    )

    # Concat, filling the missing fields with nulls (diagonal)
    bol_lf = pl.concat([bol_lf, missing_hs6_country_vols], how="diagonal")

    return bol_lf


def pad_hs6_nonmaritime(bol_lf, mdic_port_lf):
    """
    Pad the BOL data with full non-maritime MDIC volumes.
    """

    # Take BOL's hs6 / country of destination keys excluding already padded rows
    bol_hs6_country_keys = (
        bol_lf.select(
            [
                "hs6",
                "country_of_destination_trase_id",
                "padded",
            ]
        )
        .filter(pl.col("padded") != True)
        .select(["hs6", "country_of_destination_trase_id"])
        .unique()
    )

    # Get MDIC non-maritime hs6/country/mode totals for relevant countries in BOL
    mdic_hs6_nonmaritime_padding = (
        mdic_port_lf.join(
            bol_hs6_country_keys,
            left_on=["hs6", "country_of_destination.trase_id"],
            right_on=["hs6", "country_of_destination_trase_id"],
            how="inner",
            validate="m:1",
        )
        .filter((pl.col("via_description") != "MARITIMA") & (pl.col("vol") > 0))
        .select(
            [
                "hs6",
                pl.col("country_of_destination.trase_id").alias(
                    "country_of_destination_trase_id"
                ),
                pl.col("via_description").alias("mode_of_transportation"),
                pl.col("vol").alias("mass_kg"),
                "fob",
            ]
        )
        .group_by(["hs6", "country_of_destination_trase_id", "mode_of_transportation"])
        .agg(
            pl.col("mass_kg").sum().alias("mass_kg"),
            pl.col("fob").sum().alias("fob"),
        )
        .with_columns(
            pl.col("mass_kg").alias("net_weight_kg"),
            (pl.col("mass_kg") / 1000).alias("net_weight_tonnes"),
            pl.lit(True).alias("padded"),
            pl.lit("non-maritime padding").alias("padding_type"),
        )
        .select(
            [
                "hs6",
                "country_of_destination_trase_id",
                "mode_of_transportation",
                "net_weight_kg",
                "net_weight_tonnes",
                "padded",
                "padding_type",
                "fob",
            ]
        )
    )

    # Concat, filling the missing fields with nulls (diagonal)
    bol_lf = pl.concat([bol_lf, mdic_hs6_nonmaritime_padding], how="diagonal")

    return bol_lf


def fill_standard_column_values(bol_lf, countries_lf, year):
    """
    The concatenation only included minimal columns, so we expand the rest with their standard values.
    """
    # Add commodity value
    hs6_commodity_map = (
        bol_lf.filter(pl.col("commodity").is_not_null())
        .select(["hs6", "commodity"])
        .unique()
    )
    bol_lf = (
        bol_lf.join(
            hs6_commodity_map, on="hs6", how="left", suffix="_map", validate="m:1"
        )
        .with_columns(
            pl.coalesce([pl.col("commodity"), pl.col("commodity_map")]).alias(
                "commodity"
            )
        )
        .drop("commodity_map")
    )

    # set manually default unknown values
    bol_lf = bol_lf.with_columns(
        pl.col("year").fill_null(year).alias("year"),
        pl.col("exporter_cnpj").fill_null("00000000000000").alias("exporter_cnpj"),
        pl.col("exporter_country_label")
        .fill_null("BRAZIL")
        .alias("exporter_country_label"),
        pl.col("exporter_group").fill_null("UNKNOWN").alias("exporter_group"),
        pl.col("exporter_label").fill_null("UNKNOWN CUSTOMER").alias("exporter_label"),
        pl.col("exporter_name").fill_null("UNKNOWN").alias("exporter_name"),
        pl.col("exporter_node_id")
        .fill_null(15548642)  # BRAZIL UNKNOWN EXPORTER
        .alias("exporter_node_id"),
        pl.col("exporter_municipality_label")
        .fill_null("UNKNOWN")
        .alias("exporter_municipality_label"),
        pl.col("exporter_municipality_name")
        .fill_null("UNKNOWN")
        .alias("exporter_municipality_name"),
        pl.col("exporter_municipality_trase_id")
        .fill_null("BR-XXXXXXX")
        .alias("exporter_municipality_trase_id"),
        pl.col("exporter_state_label")
        .fill_null("UNKNOWN STATE")
        .alias("exporter_state_label"),
        pl.col("exporter_state_name")
        .fill_null("UNKNOWN STATE")
        .alias("exporter_state_name"),
        pl.col("exporter_state_trase_id")
        .fill_null("BR-XX")
        .alias("exporter_state_trase_id"),
        pl.col("exporter_trase_id")
        .fill_null("BR-TRADER-XXXXXXXX")
        .alias("exporter_trase_id"),
        pl.col("exporter_type").fill_null("unknown").alias("exporter_type"),
        pl.col("importer_group").fill_null("UNKNOWN").alias("importer_group"),
        pl.col("importer_label").fill_null("UNKNOWN CUSTOMER").alias("importer_label"),
        pl.col("importer_name").fill_null("UNKNOWN").alias("importer_name"),
        pl.col("importer_node_id").fill_null(15548642).alias("importer_node_id"),
        # fill null hs4 and hs5 based on hs6
        pl.col("hs4").fill_null(pl.col("hs6").str.slice(0, 4)).alias("hs4"),
        pl.col("hs5").fill_null(pl.col("hs6").str.slice(0, 5)).alias("hs5"),
        pl.col("port_of_export_label")
        .fill_null("UNKNOWN PORT BRAZIL")
        .alias("port_of_export_label"),
        pl.col("port_of_export_name")
        .fill_null("UNKNOWN PORT BRAZIL")
        .alias("port_of_export_name"),
        pl.col("port_of_import_country_label")
        .fill_null("UNKNOWN COUNTRY")
        .alias("port_of_import_country_label"),
        pl.col("port_of_import_country_name")
        .fill_null("UNKNOWN COUNTRY")
        .alias("port_of_import_country_name"),
        pl.col("port_of_import_label")
        .fill_null("UNKNOWN PORT")
        .alias("port_of_import_label"),
        pl.col("port_of_import_name")
        .fill_null("UNKNOWN PORT")
        .alias("port_of_import_name"),
    )

    # If field 'hs6_original' exists (after remapping), fill null hs6_original with hs6 values
    if "hs6_original" in bol_lf.collect_schema().names():
        bol_lf = bol_lf.with_columns(
            pl.col("hs6_original").fill_null(pl.col("hs6")).alias("hs6_original")
        )

    # Add country_of_destination_name and country_of_destination_economic_bloc
    bol_lf = clean_countries_and_economic_blocs(bol_lf, countries_lf, year)

    # create a maritime field based on mode_of_transportation
    bol_lf = bol_lf.with_columns(
        pl.when(pl.col("mode_of_transportation") == "MARITIMA")
        .then(pl.lit("maritime"))
        .otherwise(pl.lit("non-maritime"))
        .alias("maritime_non_maritime")
    )

    return bol_lf


def pad_brazil_bol_with_mdic(bol_lf, mdic_port_lf, countries_lf, year):

    # # convert 120100/12010000 hs6/hs8 in BOL to match correct MDIC 120190/12019000
    bol_lf = bol_lf.with_columns(
        pl.when(pl.col("hs6") == "120100")
        .then(pl.lit("120190"))
        .otherwise(pl.col("hs6"))
        .alias("hs6"),
        pl.when(pl.col("hs8") == "12010000")
        .then(pl.lit("12019000"))
        .otherwise(pl.col("hs8"))
        .alias("hs8"),
    )

    # Filter mdic to hs6 present in bol
    hs6_in_bol = bol_lf.filter(pl.col("year") == year).select(pl.col("hs6").unique())
    mdic_port_lf = mdic_port_lf.filter(pl.col("year") == year)
    mdic_port_lf = mdic_port_lf.join(hs6_in_bol, on="hs6", how="inner")
    mdic_port_lf = mdic_port_lf.with_columns(
        pl.col("vol").cast(pl.Float64).alias("vol"),
        pl.col("fob").cast(pl.Float64).alias("fob"),
    )

    # prepare missing hs6 assertion (evaluate at the end)
    hs6_bol = bol_lf.filter(pl.col("year") == year).select(pl.col("hs6").unique())
    hs6_mdic = mdic_port_lf.select(pl.col("hs6").unique())
    missing_hs6 = hs6_bol.join(hs6_mdic, on="hs6", how="anti")

    # Add mode_of_transportation field in bol, all with "MARITIMA" value
    bol_lf = bol_lf.with_columns(
        pl.col("net_weight_kg").cast(pl.Float64),
        pl.col("net_weight_tonnes").cast(pl.Float64),
        pl.col("fob").cast(pl.Float64),
        pl.lit("MARITIMA").alias("mode_of_transportation"),
        pl.lit(False).alias("padded"),
        pl.lit(None).cast(pl.Utf8).alias("padding_type"),
    )

    bol_lf = pad_missing_hs6_country_vols(bol_lf, mdic_port_lf)
    bol_lf = pad_hs6_nonmaritime(bol_lf, mdic_port_lf)
    bol_lf = fill_standard_column_values(bol_lf, countries_lf, year)

    # For 2023, filter out padded EU countries within groups of hs6's where there are
    # ocurrences of `country_of_destination_name = 'EUROPEAN UNION'` in them
    # this as padding these countries might over-inflate EU volumes
    if year == 2023:
        # Check which hs6 have EUROPEAN UNION as country_of_destination_name in any of their rows
        hs6_with_eu_destination = (
            bol_lf.filter(pl.col("country_of_destination_name") == "EUROPEAN UNION")
            .select("hs6")
            .unique()
        )
        if isinstance(hs6_with_eu_destination, pl.LazyFrame):
            hs6_with_eu_destination = hs6_with_eu_destination.collect()

        # For those hs6, filter out rows with padded = true, padding_type = "hs6 country padding",
        # and country_of_destination_economic_bloc = "EUROPEAN UNION"
        bol_lf = bol_lf.filter(
            ~(
                (pl.col("hs6").is_in(hs6_with_eu_destination["hs6"]))
                & (pl.col("padded") == True)
                & (pl.col("padding_type") == "hs6 country padding")
                & (pl.col("country_of_destination_economic_bloc") == "EUROPEAN UNION")
            )
        )

    # Assert no hs6 in bol is missing in mdic
    if isinstance(missing_hs6, pl.LazyFrame):
        missing_hs6 = missing_hs6.collect()
    if missing_hs6.height > 0:
        missing_list = missing_hs6.select("hs6").to_series().to_list()
        raise ValueError(
            "Some hs6 present in BOL are missing in MDIC port data: " f"{missing_list}"
        )

    # Assert unique country_of_destination_trase_id, country_of_destination_name, country_of_destination_economic_bloc combinations
    country_combo = (
        bol_lf.select(
            [
                "country_of_destination_trase_id",
                "country_of_destination_name",
                "country_of_destination_economic_bloc",
            ]
        )
        .unique()
        .group_by("country_of_destination_trase_id")
        .agg(pl.count().alias("count"))
        .filter(pl.col("count") > 1)
    )
    if isinstance(country_combo, pl.LazyFrame):
        country_combo = country_combo.collect()
    if country_combo.height > 0:
        problematic_countries = (
            country_combo.select("country_of_destination_trase_id")
            .to_series()
            .to_list()
        )
        raise ValueError(
            "Some country_of_destination_trase_id have multiple country_of_destination_name or country_of_destination_economic_bloc values: "
            f"{problematic_countries}"
        )

    return bol_lf
import polars as pl

from trase.data.brazil.trade.padding.brazil_bol_mdic_padding_20XX import (
    pad_brazil_bol_with_mdic,
)


def model(dbt, cursor):
    dbt.config(materialized="external")
    year = 2023

    bol_2023_lf = dbt.ref("brazil_bol_2023_gold").pl()
    mdic_port_2023_lf = dbt.ref("brazil_mdic_port_2023").pl()
    countries_lf = dbt.ref("postgres_countries").pl()

    padded_bol_2023_lf = pad_brazil_bol_with_mdic(
        bol_2023_lf, mdic_port_2023_lf, countries_lf, year
    )

    # Debug: save in '~/Trase/data/brazil_bol_2023_mdic_padded.parquet' for inspection
    # print(
    #     "Saving padded BOL data to '~/Trase/data/brazil_bol_2023_mdic_padded.parquet' for inspection..."
    # )
    # padded_bol_2023_lf.write_parquet("~/Trase/data/brazil_bol_2023_mdic_padded.parquet")

    return padded_bol_2023_lf