Skip to content

Brazil Bol 2023 Silver

s3://trase-storage/brazil/trade/bol/2023/silver/brazil_bol_2023_silver.parquet

Dbt path: trase_production.main_brazil.brazil_bol_2023_silver

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/trade/bol/_schema_brazil_bol.yml

Model file link: trase/data_pipeline/models/brazil/trade/bol/2023/brazil_bol_2023_silver.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: brazil, bol, trade, 2023, silver


brazil_bol_2023_silver

Description

Explore in Metabase here

Cleans the Brazil BOL for 2023, and includes various tests to check the data is ok. Find where the fields come from and how they were cleaned in the fields documentation.


Details

Column Type Description
commodity VARCHAR Uppercase name of the commodity (BEEF, SOY, etc.) based on the hs4 code, and the list of commodities in the Trase reference table postgres_commodities
country_of_destination_label VARCHAR Based on the Datamar source field PLACE_AND_PORTS_DEST_COUNTRY: "Country where the carrier delivered the cargo"
country_of_destination_name VARCHAR Official Trase country name where the export was delivered. It is cleaned based on country_of_destination_label field Note that European Union countries are not unified as 'EUROPEAN UNION' country, although the source BOL does has 768 records with the label "EUROPEAN" instead of a specific country.
country_of_destination_trase_id VARCHAR Official Trase country id, using the iso two letter country code based on the field country_of_destination_name
date DATE Based on the source field DATES_LONG_HAUL_YYYYMMDD: Berthing (arrival) date of the long haul vessel: Year, month and day
economic_bloc VARCHAR If the country of destination is part of the EU for the year of the BOL, it sets the value "EUROPEAN UNION" here. Note that currently no other economic blocs are being identified. If this is done in the future, probably the field will turn into an array.
exporter_cnpj VARCHAR Based on the source field COMPANY_SHIPPER_REGISTRATION_NUMBER (the shipper is the exporter), which already only contains digits. The cleaning of this field uses stdnum.br python library to identify if its a valid cnpj, cpf, or unkown (and sets this in the exporter_type field). If its a valid cnpj it left pads with 0 until it reaches 14 characters, and if its a cpf left pads to 11 characters.
exporter_country_label VARCHAR Based on the source field COMPANY_SHIPPER_COUNTRY_NAME: country name of the exporter
exporter_group VARCHAR Official Trase trader group name, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label. As a trader can change its group affiliation in time, the group valid for the current BOL year is taken.
exporter_label VARCHAR Based on the source field COMPANY_SHIPPER_SHIPPER_NAME_DETAILED: Detailed Shipper (exporter) name.
exporter_municipality_label VARCHAR Based on the source field COMPANY_SHIPPER_CITY: Shipper (exporter) city
exporter_municipality_name VARCHAR Official Trase name of the municipality, based on the exporter_municipality_label, and cleaning it based on the postgres_regions reference table - where it checks the municipality exists and takes the official name
exporter_municipality_trase_id VARCHAR Official Trase id of the municipality, usually created based on an official geocode of it. It is based on the exporter_municipality_label, and cleaning it based on the postgres_regions reference table - where it checks the municipality exists and takes the id
exporter_name VARCHAR Official Trase trader name, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label field
exporter_state_label VARCHAR Based on the source field COMPANY_SHIPPER_STATE_NAME: Shipper (exporter) state name
exporter_state_name VARCHAR Official Trase name of the state, based on the exporter_state_label, and cleaning it based on the postgres_regions reference table - where it checks the state exists and takes the official name
exporter_state_trase_id VARCHAR Official Trase id of the state, based on the exporter_state_label, and cleaning it based on the postgres_regions reference table - where it checks the state exists and takes the official id
exporter_trase_id VARCHAR Official Trase trader id, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label field. The trase id is built based on the tax number.
exporter_type VARCHAR Based on the field exporter_cnpj, it identifies if its a valid cnpj, cpf or if its of unknown type.
fob DOUBLE Based on the field fob_original, including imputing empty values based on COMTRADE or BOL average values for the hs6 code. Where imputed, the field fob_adjustment_type shows which kind of imputing was done.
fob_adjustment_type VARCHAR
fob_original DOUBLE Based on the source field MEASURES_SECEX_FOB. Though this field doesn't exist in the vendor glossary, there is a similar field in the glossary called MEASURES_FOB_VALUE_USD: Monthly average FOB value per commodity in US dollars.
hs4 VARCHAR
hs5 VARCHAR
hs6 VARCHAR
hs6_description VARCHAR
hs8 VARCHAR
importer_group VARCHAR
importer_label VARCHAR
importer_name VARCHAR
month INTEGER
net_weight_kg DOUBLE Based on the source field MEASURES_WTKG: Weight in kilos
net_weight_tonnes DOUBLE Based on the source field MEASURES_WTMT: Weight in metric tonnes
port_of_export_label VARCHAR Based on the source field PLACE_AND_PORTS_POL_NAME: POL โ€ PORT OF LOADING โ€ Port where the LONG HAUL vessel loaded the cargo
port_of_export_name VARCHAR Official Trase port name where the cargo was loaded. It is cleaned based on port_of_export_label field and the postgres_ports reference table, checking also the port exists in the port_of_export_country
port_of_import_country_label VARCHAR Based on the source field PLACE_AND_PORTS_POD_COUNTRY: POD โ€ PORT OF DISCHARGE โ€ Country where the LONG HAUL vessel discharged the cargo
port_of_import_country_name VARCHAR
port_of_import_label VARCHAR Based on the source field PLACE_AND_PORTS_POD_NAME: POD โ€ PORT OF DISCHARGE โ€ Port where the LONG HAUL vessel discharged the cargo
port_of_import_name VARCHAR Official Trase port name where the cargo was discharged. If there is no current existing port with that name in the port_of_import_country, it returns a NULL. It is cleaned based on port_of_import_label field and the postgres_ports reference table.
usd_per_kg_record DOUBLE
usd_per_kg_reference DOUBLE
usd_per_kg_zscore DOUBLE
CARGO_TRANSPORT_CARGO_TYPE VARCHAR
CARGO_TRANSPORT_CONTAINER_TYPE VARCHAR Container Type (R = Reefer, D = Dry, T= Tank, 0 = bulk cargo)
CARGO_TRANSPORT_MODE VARCHAR LCL (Less Container Load) or FCL (Full Container Load)
CARGO_TRANSPORT_PACKAGE_TYPE VARCHAR Type of packaging used (this field is not always filled in)
CARRIER_CARRIER_AGENT VARCHAR Carrier's maritime agent in port - there is also the vessel operator's agent
CARRIER_CARRIER_DATAMAR VARCHAR Name of the carrier - shows actual carrier when Carrier_Name is a 'carrier of convenience'
CARRIER_CARRIER_DATAMAR_GROUP_NAME VARCHAR Name of carrier in reference to SCAC (Standard Carrier Alpha Code)
CARRIER_CARRIER_DATAMAR_GROUP_SCAC VARCHAR 4 letter code identifying the carrier
CARRIER_CARRIER_GROUP_NAME VARCHAR Name of carrier in reference to SCAC (Standard Carrier Alpha Code)
CARRIER_CARRIER_GROUP_SCAC VARCHAR 4 letter code identifying the carrier
CARRIER_CARRIER_NAME VARCHAR Name of the carrier
BL_DESCRIPTION_BL_DESCRIPTION VARCHAR
DTM_COMMODITY_GROUP_EN VARCHAR
DTM_COMMODITY_GROUP_PT VARCHAR
COMMODITY_HS_HS2_CODE VARCHAR First 2 digits of the HS code
COMMODITY_HS_HS2_ENGLISH VARCHAR General description of goods in HS2 in English
COMMODITY_HS_HS2_PORTUGUES VARCHAR General description of goods in HS2 in Portuguese
COMMODITY_HS_HS4_CODE VARCHAR First 4 digits of the HS code
COMMODITY_HS_HS4_ENGLISH VARCHAR General description of goods in HS4 in English
COMMODITY_HS_HS4_PORTUGUES VARCHAR General description of goods in HS4 in Portuguese
COMMODITY_HS_HS6_CODE VARCHAR First 6 digits of the HS code
COMMODITY_HS_HS6_ENGLISH VARCHAR General description of goods in HS6 in English
COMMODITY_HS_HS6_PORTUGUES VARCHAR General description of goods in HS6 in Portuguese
COMMODITY_HS_HS8_CODE VARCHAR Full 8 digits of Mercosul's Commodity code
COMMODITY_HS_HS8_PORTUGUES VARCHAR General description of goods in HS8 in Portuguese
COMMODITY_HS_DATAMAR_HS2_CODE VARCHAR First 2 digits of the HS code - with Datamar analysis
COMMODITY_HS_DATAMAR_HS2_ENGLISH VARCHAR General description of goods in HS2 in English
COMMODITY_HS_DATAMAR_HS2_PORTUGUES VARCHAR General description of goods in HS2 in Portuguese
COMMODITY_HS_DATAMAR_HS4_CODE VARCHAR First 4 digits of the HS code - with Datamar analysis
COMMODITY_HS_DATAMAR_HS4_ENGLISH VARCHAR General description of goods in HS4 in English
COMMODITY_HS_DATAMAR_HS4_PORTUGUES VARCHAR General description of goods in HS4 in Portuguese
COMMODITY_HS_DATAMAR_HS6_CODE VARCHAR First 6 digits of the HS code - with Datamar analysis
COMMODITY_HS_DATAMAR_HS6_ENGLISH VARCHAR General description of goods in HS6 in English
COMMODITY_HS_DATAMAR_HS6_PORTUGUES VARCHAR General description of goods in HS6 in Portuguese
COMMODITY_HS_DATAMAR_HS8_CODE VARCHAR Full 8 digits of Mercosul's Commodity code - with Datamar analysis
COMMODITY_HS_DATAMAR_HS8_PORTUGUES VARCHAR General description of goods in HS8 in Portuguese
COMPANY_CONSIGNEE_CITY VARCHAR City name
COMPANY_CONSIGNEE_CONSIGNEE_NAME VARCHAR Consignee name
COMPANY_CONSIGNEE_CONSIGNEE_NAME_DETAILED VARCHAR Detailed Consignee name
COMPANY_CONSIGNEE_CONSIGNEE_GROUP VARCHAR
COMPANY_CONSIGNEE_REGISTRATION_NUMBER VARCHAR Registration number
COMPANY_CONSIGNEE_COUNTRY VARCHAR Country name
COMPANY_CONSIGNEE_NEIGHBORHOOD VARCHAR Neighbourhood Name
COMPANY_CONSIGNEE_STATE VARCHAR State code
COMPANY_CONSIGNEE_STATE_NAME VARCHAR State name
COMPANY_CONSIGNEE_STREET VARCHAR Street name
COMPANY_CONSIGNEE_TYPE VARCHAR Type of company (Real cargo owner or forwarder / NVOCC)
COMPANY_CONSIGNEE_ZIP VARCHAR Zip code
COMPANY_FORWARDER_CITY VARCHAR City name
COMPANY_FORWARDER_COUNTRY VARCHAR Country name
COMPANY_FORWARDER_FORWARDER_NAME VARCHAR Forwarder name
COMPANY_FORWARDER_FORWARDER_NAME_DETAILED VARCHAR Detailed Forwarder name
COMPANY_FORWARDER_FORWARDER_GROUP VARCHAR
COMPANY_FORWARDER_REGISTRATION_NUMBER VARCHAR Registration number
COMPANY_FORWARDER_NEIGHBORHOOD VARCHAR Neighbourhood Name
COMPANY_FORWARDER_STATE VARCHAR State code
COMPANY_FORWARDER_STATE_NAME VARCHAR State name
COMPANY_FORWARDER_STREET VARCHAR Street name
COMPANY_FORWARDER_TYPE VARCHAR Type of company (Real cargo owner or forwarder / NVOCC)
COMPANY_FORWARDER_ZIP VARCHAR Zip code
COMPANY_NOTIFY_CITY VARCHAR City name
COMPANY_NOTIFY_COUNTRY VARCHAR Country name
COMPANY_NOTIFY_NEIGHBORHOOD VARCHAR Neighbourhood name
COMPANY_NOTIFY_NOTIFY_NAME VARCHAR Notify name
COMPANY_NOTIFY_NOTIFY_NAME_DETAILED VARCHAR Detailed Notify name
COMPANY_NOTIFY_NOTIFY_GROUP VARCHAR
COMPANY_NOTIFY_REGISTRATION_NUMBER VARCHAR Registration number
COMPANY_NOTIFY_STATE VARCHAR State code
COMPANY_NOTIFY_STATE_NAME VARCHAR State name
COMPANY_NOTIFY_STREET VARCHAR Street name
COMPANY_NOTIFY_TYPE VARCHAR Type of company (Real cargo owner or forwarder / NVOCC)
COMPANY_NOTIFY_ZIP VARCHAR Zip code
COMPANY_SHIPPER_CITY VARCHAR City name
COMPANY_SHIPPER_COUNTRY_NAME VARCHAR Country name
COMPANY_SHIPPER_NEIGHBORHOOD VARCHAR Neighbourhood name
COMPANY_SHIPPER_SHIPPER_NAME VARCHAR Shipper name
COMPANY_SHIPPER_SHIPPER_NAME_DETAILED VARCHAR Detailed Shipper name
COMPANY_SHIPPER_SHIPPER_GROUP VARCHAR Name of the group to which the Shipper belongs
COMPANY_SHIPPER_REGISTRATION_NUMBER VARCHAR Registration number
COMPANY_SHIPPER_STATE VARCHAR State code
COMPANY_SHIPPER_STATE_NAME VARCHAR State name
COMPANY_SHIPPER_STREET VARCHAR Street name
COMPANY_SHIPPER_TYPE VARCHAR Type of company (Real cargo owner or forwarder / NVOCC)
COMPANY_SHIPPER_ZIP VARCHAR Zip code
IDENTIFICATION_ID_DATAMAR VARCHAR Internal code for Datamar to identify the data in that record
OPERATOR_FEEDER_OPERATOR_FEEDER_AGENT VARCHAR Feeder vessel operator's agent
OPERATOR_FEEDER_OPERATOR_FEEDER_NAME VARCHAR Feeder vessel operator name
DATES_FEEDER_DATE VARCHAR
DATES_FEEDER_DAYOFWEEK VARCHAR Berthing (arrival) date of the long haul vessel: Day of the week (Monday to Friday)
DATES_FEEDER_DD VARCHAR Berthing (arrival) date of the long haul vessel: Day
DATES_FEEDER_FIRSTDAYOFWEEK VARCHAR Berthing (arrival) date of the long haul vessel: First day of the week
DATES_FEEDER_LASTDAYOFWEEK VARCHAR Berthing (arrival) date of the long haul vessel: Last day of the week
DATES_FEEDER_MM VARCHAR Berthing (arrival) date of the long haul vessel: Month
DATES_FEEDER_QTR VARCHAR Berthing (arrival) date of the long haul vessel: Quarter
DATES_FEEDER_SEM VARCHAR Berthing (arrival) date of the long haul vessel: Semester
DATES_FEEDER_WEEKYEAR VARCHAR Berthing (arrival) date of the long haul vessel: Number of weeks
DATES_FEEDER_YYYY VARCHAR Berthing (arrival) date of the long haul vessel: Year
DATES_FEEDER_YYYYMM VARCHAR Berthing (arrival) date of the long haul vessel: Year and month
DATES_FEEDER_YYYYMMDD VARCHAR Berthing (arrival) date of the long haul vessel: Year, month and day
DATES_FEEDER_YYYYQTR VARCHAR Berthing (arrival) date of the long haul vessel: Year and quarter
DATES_FEEDER_YYYYSEM VARCHAR Berthing (arrival) date of the long haul vessel: Year and semester
DATES_FEEDER_YYYYWW VARCHAR Berthing (arrival) date of the long haul vessel: Year and week
DATES_LONG_HAUL_DATE VARCHAR
DATES_LONG_HAUL_DAYOFWEEK VARCHAR Day of the week (Monday to Friday)
DATES_LONG_HAUL_DD VARCHAR Day
DATES_LONG_HAUL_FIRSTDAYOFWEEK VARCHAR First day of the week
DATES_LONG_HAUL_LASTDAYOFWEEK VARCHAR Last day of the week
DATES_LONG_HAUL_MM VARCHAR Month
DATES_LONG_HAUL_QTR VARCHAR Quarter
DATES_LONG_HAUL_SEM VARCHAR Semester
DATES_LONG_HAUL_WEEKYEAR VARCHAR Number of weeks
DATES_LONG_HAUL_YYYY VARCHAR Year
DATES_LONG_HAUL_YYYYMM VARCHAR Year and month
DATES_LONG_HAUL_YYYYMMDD VARCHAR Year, month and day
DATES_LONG_HAUL_YYYYQTR VARCHAR Year and quarter
DATES_LONG_HAUL_YYYYSEM VARCHAR Year and semester
DATES_LONG_HAUL_YYYYWW VARCHAR Year and week
PLACE_AND_PORTS_DEST_COUNTRY VARCHAR DEST โ€ DESTINATION โ€ Country where the carrier delivered the cargo
PLACE_AND_PORTS_DEST_NAME VARCHAR DEST โ€ DESTINATION โ€ Location where the carrier delivered the cargo
PLACE_AND_PORTS_DEST_TRADELANE VARCHAR DEST โ€ DESTINATION โ€ Tradelane where the carrier delivered the cargo
PLACE_AND_PORTS_POD_COUNTRY VARCHAR POD โ€ PORT OF DISCHARGE โ€ Country where the LONG HAUL vessel discharged the cargo
PLACE_AND_PORTS_POD_NAME VARCHAR POD โ€ PORT OF DISCHARGE โ€ Port where the LONG HAUL vessel discharged the cargo
PLACE_AND_PORTS_POD_TERMINAL VARCHAR POD โ€ PORT OF DISCHARGE โ€ Terminal where the LONG HAUL vessel discharged the cargo (only in ECSA)
PLACE_AND_PORTS_POD_TRADELANE VARCHAR POD โ€ PORT OF DISCHARGE โ€ Tradelane where the LONG HAUL vessel discharged the cargo
PLACE_AND_PORTS_POL_COUNTRY VARCHAR POL โ€ PORT OF LOADING โ€ Country where the LONG HAUL vessel loaded the cargo
PLACE_AND_PORTS_POL_NAME VARCHAR POL โ€ PORT OF LOADING โ€ Port where the LONG HAUL vessel loaded the cargo
PLACE_AND_PORTS_POL_TERMINAL VARCHAR POL โ€ PORT OF LOADING โ€ Terminal where the LONG HAUL vessel loaded the cargo (only in ECSA)
PLACE_AND_PORTS_POL_TRADELANE VARCHAR POL โ€ PORT OF LOADING โ€ Tradelane where the LONG HAUL vessel loaded the cargo
PLACE_AND_PORTS_POMD_COUNTRY VARCHAR POMD โ€ PORT OF MARITIME DESTINATION โ€ Last maritime country of cargo
PLACE_AND_PORTS_POMD_NAME VARCHAR POMD โ€ PORT OF MARITIME DESTINATION โ€ Last maritime port of cargo
PLACE_AND_PORTS_POMD_TERMINAL VARCHAR POMD โ€ PORT OF MARITIME DESTINATION - Terminal where the vessel feeder discharged the cargo (only in ECSA)
PLACE_AND_PORTS_POMD_TRADELANE VARCHAR POMD โ€ PORT OF MARITIME DESTINATION โ€ Last maritime tradelane of cargo
PLACE_AND_PORTS_POMO_COUNTRY VARCHAR POMO โ€ PORT OF MARITIME ORIGIN โ€ First maritime country of cargo
PLACE_AND_PORTS_POMO_NAME VARCHAR POMO โ€ PORT OF MARITIME ORIGIN โ€ First maritime port of cargo
PLACE_AND_PORTS_POMO_TERMINAL VARCHAR POMO โ€ PORT OF MARITIME ORIGIN โ€ Terminal where the vessel feeder loaded the cargo (only in ECSA)
PLACE_AND_PORTS_POMO_TRADELANE VARCHAR POMO โ€ PORT OF MARITIME ORIGIN โ€ First maritime tradeline of cargo
PLACE_AND_PORTS_POR_COUNTRY VARCHAR POR โ€ PLACE OF RECEIPT โ€ Country where the carrier took charge of the cargo
PLACE_AND_PORTS_POR_NAME VARCHAR POR โ€ PLACE OF RECEIPT โ€ Location where the carrier took charge of the cargo
PLACE_AND_PORTS_POR_TRADELANE VARCHAR POR โ€ PLACE OF RECEIPT โ€ Tradelane where the carrier took charge of the cargo
SERVICE_PORT_ROTATION VARCHAR Ports serviced in the rotation order
SERVICE_SERVICE_NAME VARCHAR The commercial service of the vessel
SERVICE_TRANSIT_TIME VARCHAR Round Voyage duration in days
VESSEL_FEEDER_CONTAINER_CAPACITY_FEEDER VARCHAR Nominal capacity of containers on the vessel in TEUs
VESSEL_FEEDER_DWT_FEEDER VARCHAR Deadweight in tons
VESSEL_FEEDER_IMO_FEEDER VARCHAR IMO (International Maritime Organization) number of the vessel
VESSEL_FEEDER_REEFER_CAPACITY_FEEDER VARCHAR Nominal capacity of containers reefer on the vessel
VESSEL_FEEDER_VESSEL_NAME_FEEDER VARCHAR Name of Vessel
VESSEL_FEEDER_VESSEL_TYPE_FEEDER VARCHAR Type of Vessel (Container, General Cargo, Tank etc)
VESSEL_FEEDER_VOYAGE_FEEDER VARCHAR Voyage number
VESSEL_LONG_HAUL_CONTAINER_CAPACITY VARCHAR
VESSEL_LONG_HAUL_DWT VARCHAR
VESSEL_LONG_HAUL_IMO VARCHAR
VESSEL_LONG_HAUL_REEFER_CAPACITY VARCHAR
VESSEL_LONG_HAUL_VESSEL_LONG_HAUL_NAME VARCHAR
VESSEL_LONG_HAUL_VESSEL_LONG_HAUL_TYPE VARCHAR
MEASURES_C20 VARCHAR Quantity of 20' containers
MEASURES_C40 VARCHAR Quantity of 40' containers
MEASURES_C20_PLUS_C40 VARCHAR
MEASURES_SECEX_FOB VARCHAR
MEASURES_PACKAGEQTY VARCHAR Quantity of packs/packages
MEASURES_TEU VARCHAR Quantity of TEUs
MEASURES_WTMT VARCHAR Weight in kilos
MEASURES_WTKG VARCHAR Weight in metric tons

Models / Seeds

  • source.trase_duckdb.source_brazil.original_brazil_soy_beef_bol_2023
  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.postgres_regions_without_geometry
  • model.trase_duckdb.postgres_ports
  • model.trase_duckdb.hs2017
  • model.trase_duckdb.postgres_commodities
  • model.trase_duckdb.postgres_traders
  • model.trase_duckdb.comtrade_exports_year_exporter_hs6

Sources

  • ['source_brazil', 'original_brazil_soy_beef_bol_2023']

No called script or script source not found.

"""
Brazil BOL 2023 Silver Data Pipeline

This script processes the Brazil Bill of Lading (BOL) data for the year 2023.
It relies in Polars and existing parquet files, instead of Pandas and Postgres access.
It loads the data as lazy frames (not in eager execution), so when the model runs
dbt-duckdb brings everything together and collects the data.

Tests are defined in the dbt schema, so skipping the assertions here.

It performs various cleaning tasks, including:
* Renaming and normalizing columns
* Filtering records based on country and HS codes
* Cleaning country names
* Adding economic bloc information
* Cleaning state and municipality names
* Cleaning port names
* Cleaning CNPJ/CPF numbers
* Cleaning importer and exporter names
* Cleaning fobs (imputing missing and adjusting a few outliers)
* Reordering columns for better readability
"""

import polars as pl
import stdnum.br.cnpj
import stdnum.br.cpf

YEAR = 2023


def rename_and_normalize_columns(bol_lf):
    """
    Select the columns of interest, renames them and converts the types,
    filtering for the beef hs codes
    """

    columns = {
        "DATES_LONG_HAUL_YYYYMMDD": "date",
        # hs codes
        "COMMODITY_HS_HS4_CODE": "hs4",
        "COMMODITY_HS_HS6_CODE": "hs6",
        "COMMODITY_HS_HS6_ENGLISH": "hs6_description",
        "COMMODITY_HS_HS8_CODE": "hs8",
        # exporters
        "COMPANY_SHIPPER_CITY": "exporter_municipality_label",
        "COMPANY_SHIPPER_REGISTRATION_NUMBER": "exporter_cnpj",
        "COMPANY_SHIPPER_SHIPPER_NAME_DETAILED": "exporter_label",
        "COMPANY_SHIPPER_STATE_NAME": "exporter_state_label",
        "COMPANY_SHIPPER_COUNTRY_NAME": "exporter_country_label",
        # ports, country
        "PLACE_AND_PORTS_POL_NAME": "port_of_export_label",
        "PLACE_AND_PORTS_POD_NAME": "port_of_import_label",
        "PLACE_AND_PORTS_POD_COUNTRY": "port_of_import_country_label",
        "PLACE_AND_PORTS_DEST_COUNTRY": "country_of_destination_label",
        # importer
        "COMPANY_CONSIGNEE_CONSIGNEE_NAME_DETAILED": "importer_label",
        # net_weight_kg, fob
        "MEASURES_WTKG": "net_weight_kg",
        "MEASURES_WTMT": "net_weight_tonnes",
        "MEASURES_SECEX_FOB": "fob",
    }

    # Make a copy of the columns of interest. Keeping the original
    # in case we want to review transformations
    bol_lf = bol_lf.with_columns(
        [pl.col(orig_col).alias(new_col) for orig_col, new_col in columns.items()]
    )

    bol_lf = bol_lf.with_columns(
        date=pl.col("date").str.to_date(format="%Y%m%d", strict=False),
        # Add month as integer extracted from date
        month=pl.col("date")
        .str.to_date(format="%Y%m%d", strict=False)
        .dt.month()
        .cast(pl.Int32),
        # Ensure proper decimal formatting and scientific notation handling
        net_weight_kg=pl.col("net_weight_kg").str.replace(",", ".").cast(pl.Float64),
        net_weight_tonnes=pl.col("net_weight_tonnes")
        .str.replace(",", ".")
        .cast(pl.Float64),
        fob=pl.col("fob").str.replace(",", ".").cast(pl.Float64),
    )

    do_not_clean = [
        "date",
        "month",
        "hs4",
        "hs6",
        "net_weight_kg",
        "net_weight_tonnes",
        "fob",
    ]

    # Trim, normalize, uppercase, remove extra spaces
    bol_lf = bol_lf.with_columns(
        [
            pl.col(col).str.strip_chars()
            # Replace multiple spaces with a single space
            .str.replace(r"\s+", " ")
            # Convert accents and diacritics (see https://stackoverflow.com/a/77217563)
            .str.normalize("NFKD")
            .str.replace_all(r"\p{CombiningMark}", "")
            .str.to_uppercase()
            .alias(col)
            for col in columns.values()
            if col not in do_not_clean
        ]
    )

    # Add hs5 as the first 5 characters of hs6
    bol_lf = bol_lf.with_columns(hs5=pl.col("hs6").str.slice(0, 5))

    return bol_lf


def clean_countries(bol_lf, countries_lf):
    """
    Adjusts 'country_of_destination_label' and 'port_of_import_country_label'
    with the 'official' country names used in Trase. Assumes that all
    'country_of_destination_label' exist in the official country list.
    """

    # Clean country_of_destination_name
    country_synonyms_lf = countries_lf.select(
        "country_trase_id", "country_name", "synonyms"
    )
    country_synonyms_lf = country_synonyms_lf.rename(
        {
            "country_trase_id": "country_of_destination_trase_id",
            "country_name": "country_of_destination_name",
        }
    )

    country_synonyms_lf = country_synonyms_lf.explode("synonyms")  # unpack synonyms[]
    bol_lf = bol_lf.join(
        country_synonyms_lf,
        how="left",
        left_on="country_of_destination_label",
        right_on="synonyms",
        validate="m:1",
    )

    # clean port_of_import_country_name
    country_synonyms_lf = country_synonyms_lf.drop(
        "country_of_destination_trase_id"
    )  # not using it here
    country_synonyms_lf = country_synonyms_lf.rename(
        {
            "country_of_destination_name": "port_of_import_country_name",
        }
    )
    bol_lf = bol_lf.join(
        country_synonyms_lf,
        how="left",
        left_on="port_of_import_country_label",
        right_on="synonyms",
        validate="m:1",
    )

    return bol_lf


def filter_country_and_hscodes(bol_lf, existing_hs_codes_lf, commodities_lf):
    """
    Filters for records from Brazil, and known hs codes. Also adds a field called 'commodity'
    with the commodity type based on the hs4 code.

    Summary of records removed:
    exporter_country_label != 'BRAZIL':
        - UNKNOWN: 332
        - ARGENTINA: 36
        - PARAGUAY: 25
        - URUGUAY: 25

    hs4 not in 'existing_hscodes' reference table:
        - 20 records with the HS CODE 0033
    """
    # Filters out
    bol_lf = bol_lf.filter(pl.col("exporter_country_label") == pl.lit("BRAZIL"))

    # Keep only records with known hs4 codes (using semi join)
    existing_hs4_codes_lf = existing_hs_codes_lf.filter(pl.col("type") == pl.lit("hs4"))
    bol_lf = bol_lf.join(
        existing_hs4_codes_lf,
        how="semi",
        left_on="hs4",
        right_on="code",
    )

    # Add a 'commodity' field based on the hs4 code, and the commodities_lf
    commodities_lf = commodities_lf.select("hs_code", "commodity")
    commodities_lf = commodities_lf.with_columns(hs4=pl.col("hs_code").str.slice(0, 4))
    commodities_lf = commodities_lf.drop("hs_code")
    commodities_lf = commodities_lf.unique(subset=["hs4"], keep="first")
    bol_lf = bol_lf.join(
        commodities_lf,
        how="left",
        on="hs4",
        validate="m:1",
    )

    return bol_lf


def add_european_union_bloc(bol_lf, countries_lf):
    """
    Adds a 'economic_bloc_name' column with "EUROPEAN UNION" for countries currently making part of it.
    If in the future more economic blocs are considered, probably 'economic_bloc_name' would change to be
    an array.

    Takes the 'countries_lf' which contains, per country, an array of json strings with the information
    of the economic blocs.
    """

    economic_blocs_lf = countries_lf.select("country_trase_id", "economic_bloc")
    economic_blocs_lf = economic_blocs_lf.explode("economic_bloc")

    # Turn the json string into a struct with the appropriate types
    dtype = pl.Struct(
        [
            pl.Field("economic_bloc", pl.Utf8),
            pl.Field("time_start", pl.Datetime),
            pl.Field("time_end", pl.Datetime),
        ]
    )
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
    )

    # Flatten the struct into new fields
    economic_blocs_lf = economic_blocs_lf.with_columns(
        economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
        time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
        time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
    )

    # Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
    economic_blocs_lf = (
        economic_blocs_lf.filter(
            pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
        )
        .filter(pl.col("time_start").dt.year() <= pl.lit(YEAR))
        .filter(
            (pl.col("time_end").is_null())
            | (pl.col("time_end").dt.year() >= pl.lit(YEAR))
        )
    )
    economic_blocs_lf = economic_blocs_lf.select(
        "country_trase_id", "economic_bloc_name"
    )
    economic_blocs_lf = economic_blocs_lf.rename(
        {"economic_bloc_name": "economic_bloc"}
    )

    bol_lf = bol_lf.join(
        economic_blocs_lf,
        how="left",
        left_on="country_of_destination_trase_id",
        right_on="country_trase_id",
        validate="m:1",
    )

    return bol_lf


def clean_states_and_municipalities(bol_lf, regions_lf):
    """
    Cleans the state label for the official one we use in Trase. Assumes
    all synonyms exist for the labels. [pending: Also corrects if there are wrongly
    assigned municipality - state values in the trade data]
    """
    # ------------------------------------------------------------------------
    # Clean states
    # ------------------------------------------------------------------------

    regions_lf = regions_lf.select(
        "trase_id", "name", "synonyms", "country", "level", "parent_trase_id"
    )

    # Only valid states from brazil (level 3, with trase_id of the form 'BR-XX')
    states_lf = regions_lf.filter(
        (pl.col("level") == pl.lit(3))
        & (pl.col("country") == pl.lit("BRAZIL"))
        & (pl.col("trase_id").str.len_chars() == pl.lit(5))
    )
    states_lf = states_lf.rename(
        {
            "trase_id": "exporter_state_trase_id",
            "name": "exporter_state_name",
            "synonyms": "state_synonyms",
        }
    )
    states_lf = states_lf.drop(["country", "level"])
    states_lf = states_lf.explode("state_synonyms")

    bol_lf = bol_lf.join(
        states_lf,
        how="left",
        left_on="exporter_state_label",
        right_on="state_synonyms",
        validate="m:1",
    )

    # Manually correct 'BARUERI' municipality from PARANA state to SAO PAULO, with corresponding id's
    bol_lf = bol_lf.with_columns(
        # State correction
        exporter_state_trase_id=pl.when(
            (pl.col("exporter_municipality_label") == pl.lit("BARUERI"))
            & (pl.col("exporter_state_trase_id") == pl.lit("BR-41"))
        )
        .then(pl.lit("BR-35"))
        .otherwise(pl.col("exporter_state_trase_id")),
        exporter_state_name=pl.when(
            (pl.col("exporter_municipality_label") == pl.lit("BARUERI"))
            & (pl.col("exporter_state_trase_id") == pl.lit("BR-41"))
        )
        .then(pl.lit("SAO PAULO"))
        .otherwise(pl.col("exporter_state_name")),
    )

    # Assign to missing labels the default UNKNOWN identifiers
    bol_lf = bol_lf.with_columns(
        exporter_state_trase_id=pl.when(pl.col("exporter_state_label").is_null())
        .then(pl.lit("BR-XX"))
        .otherwise(pl.col("exporter_state_trase_id")),
        exporter_state_name=pl.when(pl.col("exporter_state_label").is_null())
        .then(pl.lit("UNKNOWN STATE"))
        .otherwise(pl.col("exporter_state_name")),
    )

    bol_lf = bol_lf.drop("parent_trase_id")

    # ------------------------------------------------------------------------
    # Clean municipalities
    # ------------------------------------------------------------------------

    # load Brazil municipalities (level 6, with trase_id of the form 'BR-XXXXXXX')
    municipalities_lf = regions_lf.filter(
        (pl.col("level") == pl.lit(6))
        & (pl.col("country") == pl.lit("BRAZIL"))
        & (pl.col("trase_id").str.len_chars() == pl.lit(10))
    )
    municipalities_lf = municipalities_lf.rename(
        {
            "trase_id": "exporter_municipality_trase_id",
            "name": "exporter_municipality_name",
            "synonyms": "municipality_synonyms",
        }
    )

    municipalities_lf = municipalities_lf.drop(["country", "level"])
    municipalities_lf = municipalities_lf.explode("municipality_synonyms")

    # In the join, label-synonyms also have to match the parent state
    bol_lf = bol_lf.join(
        municipalities_lf,
        how="left",
        left_on=["exporter_state_trase_id", "exporter_municipality_label"],
        right_on=["parent_trase_id", "municipality_synonyms"],
    )

    # Manually assign 'VARZEA GRANDE' (BR-5108402) to 'CAPAO GRANDE' (its an industrial district within the municipality)
    bol_lf = bol_lf.with_columns(
        exporter_municipality_trase_id=pl.when(
            pl.col("exporter_municipality_label") == pl.lit("CAPAO GRANDE")
        )
        .then(pl.lit("BR-5108402"))
        .otherwise(pl.col("exporter_municipality_trase_id")),
        exporter_municipality_name=pl.when(
            pl.col("exporter_municipality_label") == pl.lit("CAPAO GRANDE")
        )
        .then(pl.lit("VARZEA GRANDE"))
        .otherwise(pl.col("exporter_municipality_name")),
    )

    # Fill default UNKNOWN identifiers for incorrect municipalities (10 records)
    incorrect_municipality_labels = [
        "COVINGTON",
        "GOVERNADOR DIX SEPT ROSADO",
        "CINGAPURA",
    ]
    bol_lf = bol_lf.with_columns(
        exporter_municipality_trase_id=pl.when(
            pl.col("exporter_municipality_label").is_in(incorrect_municipality_labels)
        )
        .then(pl.lit("BR-XXXXXXX"))
        .otherwise(pl.col("exporter_municipality_trase_id")),
        exporter_municipality_name=pl.when(
            pl.col("exporter_municipality_label").is_in(incorrect_municipality_labels)
        )
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("exporter_municipality_name")),
    )

    return bol_lf


def clean_ports(bol_lf, ports_lf):
    """
    Creates the 'port_of_export_name' and 'port_of_import_name' columns.
    Cleans the 'port_of_export_label' and 'port_of_import_label' columns against the synonyms
    in the ports reference table (currently a view from postgres views.regions created through
    dbt model 'postgres_ports')
    """

    ports_lf = ports_lf.select("name", "synonyms", "country")

    # ------------------------------------------------------------------------
    # Clean ports of export
    # ------------------------------------------------------------------------
    brazil_ports_lf = ports_lf.filter(pl.col("country") == pl.lit("BRAZIL"))
    brazil_ports_lf = brazil_ports_lf.rename(
        {
            "name": "port_of_export_name",
            "synonyms": "brazil_port_synonyms",
        }
    )
    brazil_ports_lf = brazil_ports_lf.drop(["country"])
    brazil_ports_lf = brazil_ports_lf.explode("brazil_port_synonyms")

    bol_lf = bol_lf.join(
        brazil_ports_lf,
        how="left",
        left_on="port_of_export_label",
        right_on="brazil_port_synonyms",
        validate="m:1",
    )

    # ------------------------------------------------------------------------
    # Clean ports of import
    # ------------------------------------------------------------------------
    world_ports_lf = ports_lf.rename(
        {
            "name": "port_of_import_name",
            "synonyms": "world_port_synonyms",
            "country": "port_country",
        }
    )
    world_ports_lf = world_ports_lf.explode("world_port_synonyms")
    world_ports_lf = world_ports_lf.unique()

    bol_lf = bol_lf.join(
        world_ports_lf,
        how="left",
        left_on=["port_of_import_country_name", "port_of_import_label"],
        right_on=["port_country", "world_port_synonyms"],
    )

    return bol_lf


def clean_cnpjs(bol_lf):
    """
    Clean cnpjs and create a column 'exporter_type' indicating cnpj, cpf, or null
    Note that a number can't be at the same time a valid cnpj or cpf, and that
    some records might have 11 digits (as a cpf) but fail validation as they have
    incorrect check digits
    """
    bol_lf = bol_lf.with_columns(
        # Create 'exporter_type' based on validity
        pl.when(
            pl.col("exporter_cnpj")
            .str.pad_start(14, "0")
            .map_elements(stdnum.br.cnpj.is_valid)
        )
        .then(pl.lit("cnpj"))
        .when(
            pl.col("exporter_cnpj")
            .str.pad_start(11, "0")
            .map_elements(stdnum.br.cpf.is_valid)
        )
        .then(pl.lit("cpf"))
        .otherwise(pl.lit("unknown"))
        .alias("exporter_type"),
        # Normalize based on validity
        pl.when(
            pl.col("exporter_cnpj")
            .str.pad_start(14, "0")
            .map_elements(stdnum.br.cnpj.is_valid)
        )
        .then(pl.col("exporter_cnpj").str.pad_start(14, "0"))
        .when(
            pl.col("exporter_cnpj")
            .str.pad_start(11, "0")
            .map_elements(stdnum.br.cpf.is_valid)
        )
        .then(pl.col("exporter_cnpj").str.pad_start(11, "0"))
        .when(pl.col("exporter_cnpj") == pl.lit("0"))
        .then(pl.lit("0" * 14))
        .otherwise(pl.col("exporter_cnpj"))
        .alias("exporter_cnpj"),
    )

    return bol_lf


def identify_active_group_name(df, year, group_column):
    """
    Takes a dataframe with field containing the groups information as available
    in the trader reference dataset (a list of json strings), and adds a new
    'group_name' field with the valid group name for the given year.
    """

    # Add a row index to be able to group by it later
    df = df.with_row_index("row_index")

    df_exploded = df.explode(group_column)
    # Casting as polars might identify the field as binary
    df_exploded = df_exploded.with_columns(
        pl.col(group_column).cast(pl.Utf8).alias(group_column)
    )

    # Turn the json string into a struct with the appropriate types
    dtype = pl.Struct(
        [
            pl.Field("group", pl.Utf8),
            pl.Field("time_start", pl.Datetime),
            pl.Field("time_end", pl.Datetime),
        ]
    )
    df_exploded = df_exploded.with_columns(
        group_struct=pl.col(group_column).str.json_decode(dtype)
    )

    # Flatten the struct into new fields
    df_exploded = df_exploded.with_columns(
        group_name=pl.col("group_struct").struct.field("group"),
        time_start=pl.col("group_struct").struct.field("time_start"),
        time_end=pl.col("group_struct").struct.field("time_end"),
    )

    # Only keep groups that are valid for the year
    df_exploded = df_exploded.filter(
        (pl.col("time_end").is_null()) | (pl.col("time_end").dt.year() > pl.lit(year))
    )

    # Group again by the index, and only keep a group name
    df_unexploded = df_exploded.group_by("row_index").agg(
        pl.max("group_name").alias("group_name")
    )

    result = df.join(df_unexploded, on="row_index", how="left").drop("row_index")

    return result


def add_trader_data_by_label(
    input_trade_data_lf, db_traders_lf, trader_label_colummn, country_iso2=None
):
    """
    Adds trader information to the input trade data based on trader labels.

    This function takes a DataFrame containing trade data and adds trader information
    from a reference DataFrame. It matches trader labels in the input data with labels
    in the reference data and returns the best match including trader name, trader_trase_id,
    and trader group. The function also prioritizes traders based on the provided
    country ISO2 code and filters for active groups based on the given year.

    Parameters:
    - input_trade_data_lf (DataFrame): The input trade data containing trader labels.
    - db_traders_lf (DataFrame): The reference DataFrame containing trader information.
    - trader_label_colummn (str): The column name in the input trade data that contains trader labels.
    - country_iso2 (str, optional): The ISO2 code of the country to prioritize traders from. Defaults to None.

    Returns:
    - DataFrame: The input trade data with added trader information.
    """

    # Take the reference trader information, and unnest the labels
    db_traders_lf = db_traders_lf.select("labels", "name", "trase_id", "group_name")
    db_traders_lf = db_traders_lf.explode("labels")

    # Temporarily rename the trader_label_column to 'trader_label', and only work with it
    input_trade_data_lf = input_trade_data_lf.rename(
        {trader_label_colummn: "trader_label"}
    )
    input_labels_lf = input_trade_data_lf.select("trader_label").unique()

    joined_with_label_data = input_labels_lf.join(
        db_traders_lf, left_on="trader_label", right_on="labels", how="left"
    )

    # Add a country_priority field to prioritize the country's traders
    joined_with_label_data = joined_with_label_data.with_columns(
        country_priority=pl.when(
            (pl.lit(country_iso2).is_not_null())
            & (pl.col("trase_id").str.slice(0, 2) == pl.lit(country_iso2))
        )
        .then(pl.lit(1))
        .otherwise(pl.lit(0))
    )

    # Aggregate, giving priority to the country's traders, and only taking one value
    grouped_label_data = (
        joined_with_label_data.group_by("trader_label").agg(
            [pl.all().sort_by("country_priority", descending=True).head(1)]
        )
    ).drop("country_priority")

    # Disaggregate as we're only taking one matching record, and improve names
    grouped_label_data = grouped_label_data.explode(["name", "trase_id", "group_name"])
    grouped_label_data = grouped_label_data.rename(
        {
            "name": "trader_name",
            "trase_id": "trader_trase_id",
            "group_name": "trader_group_name",
        }
    )

    # Add the results to the original input_trade_data_lf
    input_trade_data_lf = input_trade_data_lf.join(
        grouped_label_data,
        how="left",
        on="trader_label",
    )

    # Return the original trader_label_column name
    input_trade_data_lf = input_trade_data_lf.rename(
        {"trader_label": trader_label_colummn}
    )

    return input_trade_data_lf


def clean_importers(bol_lf, traders_lf):
    """
    Note that the importers that were not identified by label at the first run, where manually
    added to the database, as registered in the script
    `trase/runbook/brazil/trade/bol/2023/brazil_trade_bol_ingestions_2023.py`
    """

    # take the distinct importer labels, excluding 'UNKNOWN CUSTOMER' as it has multiple matches
    importer_labels_lf = bol_lf.select("importer_label").unique()
    importer_labels_lf = importer_labels_lf.filter(
        pl.col("importer_label").ne("UNKNOWN CUSTOMER")
    )

    # Get the trader data for the importer labels, and merge it to bol_lf
    importer_labels_lf = add_trader_data_by_label(
        importer_labels_lf, traders_lf, "importer_label"
    )
    importer_labels_lf = importer_labels_lf.rename(
        {
            "trader_name": "importer_name",
            "trader_group_name": "importer_group",
        }
    )

    bol_lf = bol_lf.join(
        importer_labels_lf,
        how="left",
        on="importer_label",
    )

    # Add manually the name and group for the 'UNKNOWN CUSTOMER' importer
    bol_lf = bol_lf.with_columns(
        importer_name=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_name")),
        importer_group=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
        .then(pl.lit("UNKNOWN"))
        .otherwise(pl.col("importer_group")),
    )

    return bol_lf


def clean_exporters(bol_lf, traders_lf):
    """
    Cleans exporter names and assigns Trase IDs based on CNPJ and labels.

    This function performs the following steps:
    1. Cleans exporter names based on CNPJ numbers. The great majority of cases,
        as in a previous run ingested new values - which can be found in
        trase/runbook/brazil/trade/bol/2023/brazil_trade_bol_ingestions_2023.py
    2. Cleans exporter names based on labels if no match is found using CNPJ.
    3. Merges the cleaned data and normalizes the group field.

    Parameters:
    - bol_lf (LazyFrame): The input trade data containing exporter information.
    - traders_lf (LazyFrame): The reference DataFrame containing trader information.
    """
    traders_lf = traders_lf.select("labels", "name", "trase_id", "group_name")

    # ------------------------------------------------------------------------
    # a) Clean based on cnpj (BR-TRADER-XXXXXXXX)
    # ------------------------------------------------------------------------

    # take the distinct exporter labels and cnpjs
    exporters_lf = bol_lf.select("exporter_label", "exporter_cnpj").unique()

    # Add a field called 'trase-id-test' concatenating "BR-TRADER-" with the first 8 characters of the cnpj
    exporters_lf = (
        exporters_lf.with_columns(
            trase_id_test=pl.concat_str(
                pl.lit("BR-TRADER-"),
                pl.col("exporter_cnpj").str.slice(0, 8),
            )
        )
        .drop("exporter_cnpj")
        .unique()
    )
    # Assign null to the fields with BR-TRADER-00000000
    exporters_lf = exporters_lf.with_columns(
        trase_id_test=pl.when(pl.col("trase_id_test") == pl.lit("BR-TRADER-00000000"))
        .then(pl.lit(None))
        .otherwise(pl.col("trase_id_test"))
    )

    # Join the exporters based on 'trase_id_test' field (based on cnpj)
    exporters_lf = exporters_lf.join(
        traders_lf,
        how="left",
        left_on="trase_id_test",
        right_on="trase_id",
        validate="m:1",
    ).drop("labels")

    exporters_lf = exporters_lf.rename(
        {
            "name": "exporter_name",
            "group_name": "exporter_group",
        }
    )

    # Create a field 'exporter_trase_id' filled for the fields that had a match
    exporters_lf = exporters_lf.with_columns(
        exporter_trase_id=pl.when(pl.col("exporter_name").is_not_null())
        .then(pl.col("trase_id_test"))
        .otherwise(pl.lit(None))
    )

    exporters_with_cnpj_matches = exporters_lf.filter(
        pl.col("exporter_name").is_not_null()
    ).unique()

    # ------------------------------------------------------------------------
    # b) Clean based on label
    # ------------------------------------------------------------------------

    # Take the exporter_labels of the records where there was no match
    exporters_by_label_lf = (
        exporters_lf.filter(pl.col("exporter_name").is_null())
        .select("exporter_label", "trase_id_test")
        .unique()
    )

    # Match by label
    exporters_by_label_lf = add_trader_data_by_label(
        exporters_by_label_lf,
        traders_lf,
        "exporter_label",
    )
    exporters_by_label_lf = exporters_by_label_lf.rename(
        {
            "trader_name": "exporter_name",
            "trader_group_name": "exporter_group",
            "trader_trase_id": "exporter_trase_id",
        }
    )

    # At this point all traders should have a name - group match with the database

    # The ones that didn't had it when first running this script, were manually added
    # based on a review of this dataframe, and the changes done were kept in the
    # following script:
    # trase/runbook/brazil/trade/bol/2023/brazil_trade_bol_ingestions_2023.py

    # ------------------------------------------------------------------------
    # c) Bring together the two dataframes and normalize the group field
    # ------------------------------------------------------------------------

    # Add the 'trase-id-test' field to the bol_lf dataframe so to join it with
    # the exporters_with_cnpj_matches
    bol_lf = bol_lf.with_columns(
        trase_id_test=pl.concat_str(
            pl.lit("BR-TRADER-"),
            pl.col("exporter_cnpj").str.slice(0, 8),
        )
    )

    bol_lf = bol_lf.join(
        exporters_with_cnpj_matches,
        how="left",
        left_on=["exporter_label", "trase_id_test"],
        right_on=["exporter_label", "trase_id_test"],
        validate="m:1",
    )

    # join also with exporters_by_label_lf, to use when the previous didn't match
    bol_lf = bol_lf.join(
        exporters_by_label_lf,
        how="left",
        on="exporter_label",
    )

    bol_lf = bol_lf.with_columns(
        exporter_name=pl.when(pl.col("exporter_name").is_null())
        .then(pl.col("exporter_name_right"))
        .otherwise(pl.col("exporter_name")),
        exporter_group=pl.when(pl.col("exporter_group").is_null())
        .then(pl.col("exporter_group_right"))
        .otherwise(pl.col("exporter_group")),
        exporter_trase_id=pl.when(pl.col("exporter_trase_id").is_null())
        .then(pl.col("exporter_trase_id_right"))
        .otherwise(pl.col("exporter_trase_id")),
    )

    bol_lf = bol_lf.drop(
        [
            "trase_id_test",
            "trase_id_test_right",
            "exporter_name_right",
            "exporter_trase_id_right",
            "exporter_group_right",
            "trader_trase_id",
        ]
    )

    return bol_lf


def clean_fob_and_net_weight(bol_lf, comtrade_lf):
    """
    Imputes missing FOB values and adjust FOBs that have extremely high outliers,
    relying on a z-score of the usd_per_kg and manual revision in a Notebook.
    Currently not adjusting net_weight as the values seemed ok.
    """

    # Add usd_per_kg_record in bol_lf
    bol_lf = bol_lf.with_columns(
        usd_per_kg_record=pl.when(
            (pl.col("net_weight_kg").is_null()) | (pl.col("fob").is_null())
        )
        .then(pl.lit(None))
        .otherwise(pl.col("fob") / pl.col("net_weight_kg"))
    )

    # Add modified z-score to rank outliers
    median_and_mad_lf = bol_lf.group_by("hs6").agg(
        [
            pl.col("usd_per_kg_record").median().alias("median_usd_per_kilo"),
            (
                (pl.col("usd_per_kg_record") - pl.col("usd_per_kg_record").median())
                .abs()
                .median()
            ).alias("mad_usd_per_kilo"),
        ]
    )
    # Join and calculate z-score
    bol_lf = bol_lf.join(median_and_mad_lf, on="hs6", how="left").with_columns(
        (
            0.6745
            * (pl.col("usd_per_kg_record") - pl.col("median_usd_per_kilo"))
            / pl.col("mad_usd_per_kilo")
        ).alias("usd_per_kg_zscore")
    )

    # Take comtrade reference values in usd_per_kg_reference
    comtrade_lf = comtrade_lf.select(
        [
            "year",
            "country_of_export_iso",
            "commodity_code",
            "net_weight_kg",
            "fob",
        ]
    )

    # Make a list of hs4 codes we have in the BOL
    hs6_codes_lf = bol_lf.select("hs6").unique()

    # Filter COMTRADE relevant values
    comtrade_lf = comtrade_lf.join(
        hs6_codes_lf, how="semi", left_on="commodity_code", right_on="hs6"
    ).filter(
        (pl.col("year") == pl.lit(YEAR))
        & (pl.col("country_of_export_iso") == pl.lit("BRA"))
        & (pl.col("net_weight_kg") >= pl.lit(0))
        & (pl.col("fob") >= pl.lit(0))
    )

    comtrade_lf = comtrade_lf.group_by("commodity_code").agg(
        (pl.col("fob") / pl.col("net_weight_kg")).mean().alias("usd_per_kg_reference")
    )

    bol_lf = bol_lf.rename({"fob": "fob_original"})

    # Bring usd_per_kg_reference to bol_lf
    bol_lf = bol_lf.join(
        comtrade_lf,
        how="left",
        left_on="hs6",
        right_on="commodity_code",
    )

    # Impute missing FOB values based on COMTRADE
    bol_lf = bol_lf.with_columns(
        fob_adjustment_type=pl.when(
            ((pl.col("fob_original") <= 0) | pl.col("fob_original").is_null())
            & (pl.col("net_weight_kg") > 0)
        )
        .then(pl.lit("imputed_from_comtrade"))
        .otherwise(pl.lit(None)),
        fob=pl.when(
            ((pl.col("fob_original") <= 0) | pl.col("fob_original").is_null())
            & (pl.col("net_weight_kg") > 0)
        )
        .then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
        .otherwise(pl.col("fob_original")),
    )

    # Impute missing hs6 120100 values based on mean (without outliers)
    # as it doesn't exist in COMTRADE
    hs_120100_usd_per_kg_reference = (
        bol_lf.filter((pl.col("hs6") == "120100") & (pl.col("usd_per_kg_zscore") < 100))
        .select(pl.col("usd_per_kg_record"))
        .collect()
        .to_series()
        .mean()
    )

    bol_lf = bol_lf.with_columns(
        fob_adjustment_type=pl.when(
            (pl.col("hs6") == "120100") & (pl.col("fob_original").is_null())
        )
        .then(pl.lit("imputed_from_other_values"))
        .otherwise(pl.col("fob_adjustment_type")),
        fob=pl.when((pl.col("hs6") == "120100") & (pl.col("fob_original").is_null()))
        .then(pl.col("net_weight_kg") * pl.lit(hs_120100_usd_per_kg_reference))
        .otherwise(pl.col("fob")),
    )

    # Adjust cases where fob is completely off the charts, based on z-score and manual revision
    # Notebook with outlier graphs in:
    # trase/data_pipeline/models/brazil/trade/bol/2023/outlier_qa_brazil_bol_2023.ipynb

    # hs 020230 outliers (2 cases)
    bol_lf = bol_lf.with_columns(
        fob_adjustment_type=pl.when(
            (pl.col("hs6") == "020230")
            & (pl.col("usd_per_kg_zscore") > 100)
            & (pl.col("fob") > 2_000_000)
        )
        .then(pl.lit("fob_outlier_adjustment"))
        .otherwise(pl.col("fob_adjustment_type")),
        fob=pl.when(
            (pl.col("hs6") == "020230")
            & (pl.col("usd_per_kg_zscore") > 100)
            & (pl.col("fob") > 2_000_000)
        )
        .then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
        .otherwise(pl.col("fob")),
    )

    # hs 120100 outliers with records with fob > 700M USD (15 cases)
    bol_lf = bol_lf.with_columns(
        fob_adjustment_type=pl.when(
            (pl.col("hs6") == "120100")
            & (pl.col("fob") > 700_000_000)
            & (pl.col("usd_per_kg_zscore") > 100)
        )
        .then(pl.lit("fob_outlier_adjustment"))
        .otherwise(pl.col("fob_adjustment_type")),
        fob=pl.when(
            (pl.col("hs6") == "120100")
            & (pl.col("fob") > 700_000_000)
            & (pl.col("usd_per_kg_zscore") > 100)
        )
        .then(pl.col("net_weight_kg") * pl.lit(hs_120100_usd_per_kg_reference))
        .otherwise(pl.col("fob")),
    )

    # hs 150710 outliers (3 cases)
    bol_lf = bol_lf.with_columns(
        fob_adjustment_type=pl.when(
            (pl.col("hs6") == "150710") & (pl.col("usd_per_kg_zscore") > 100)
        )
        .then(pl.lit("fob_outlier_adjustment"))
        .otherwise(pl.col("fob_adjustment_type")),
        fob=pl.when((pl.col("hs6") == "150710") & (pl.col("usd_per_kg_zscore") > 100))
        .then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
        .otherwise(pl.col("fob")),
    )

    # hs 230400 outliers (1 case)
    bol_lf = bol_lf.with_columns(
        fob_adjustment_type=pl.when(
            (pl.col("hs6") == "230400") & (pl.col("usd_per_kg_zscore") > 100)
        )
        .then(pl.lit("fob_outlier_adjustment"))
        .otherwise(pl.col("fob_adjustment_type")),
        fob=pl.when((pl.col("hs6") == "230400") & (pl.col("usd_per_kg_zscore") > 100))
        .then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
        .otherwise(pl.col("fob")),
    )

    bol_lf = bol_lf.drop(["mad_usd_per_kilo", "median_usd_per_kilo"])

    return bol_lf


def reorder_columns(bol_lf):
    """
    Reordering to make it easier to see the columns we have done special
    handling or have special significance. Using the fact that unhandled
    columns are in uppercase, which might not be the case in other years
    """
    columns = bol_lf.columns
    lowercase_cols = sorted([col for col in columns if col[0].islower()])
    other_cols = [col for col in columns if col not in lowercase_cols]
    return bol_lf.select(lowercase_cols + other_cols)


def run_model(dbt, cursor):
    # get sources and models
    bol_lf = (
        dbt.source("source_brazil", "original_brazil_soy_beef_bol_2023").pl().lazy()
    )
    countries_lf = dbt.ref("postgres_countries").pl().lazy()
    regions_lf = dbt.ref("postgres_regions_without_geometry").pl().lazy()
    ports_lf = dbt.ref("postgres_ports").pl().lazy()
    existing_hs_codes_lf = dbt.ref("hs2017").pl().lazy()
    commodities_lf = dbt.ref("postgres_commodities").pl().lazy()
    traders_lf = dbt.ref("postgres_traders").pl().lazy()
    comtrade_lf = dbt.ref("comtrade_exports_year_exporter_hs6").pl().lazy()

    # Add 'group_name' field to traders_lf with the active group
    traders_lf = identify_active_group_name(traders_lf, YEAR, "groups").drop("groups")

    # run the cleaning
    bol_lf = rename_and_normalize_columns(bol_lf)
    bol_lf = filter_country_and_hscodes(bol_lf, existing_hs_codes_lf, commodities_lf)
    bol_lf = clean_countries(bol_lf, countries_lf)
    bol_lf = add_european_union_bloc(bol_lf, countries_lf)
    bol_lf = clean_states_and_municipalities(bol_lf, regions_lf)
    bol_lf = clean_ports(bol_lf, ports_lf)
    bol_lf = clean_cnpjs(bol_lf)
    bol_lf = clean_importers(bol_lf, traders_lf)
    bol_lf = clean_exporters(bol_lf, traders_lf)
    bol_lf = clean_fob_and_net_weight(bol_lf, comtrade_lf)

    bol_lf = reorder_columns(bol_lf)

    return bol_lf


def model(dbt, cursor):
    import traceback

    dbt.config(materialized="external")

    # Encapsuling in try/except to print the full traceback in case of error
    # When debugging, don't use Polars lazy data loading, as it will not show the traceback
    try:
        bol_lf = run_model(dbt, cursor)
    except Exception as e:
        # Print a full traceback
        traceback.print_exc()
        raise
    return bol_lf