Brazil Bol 2023 Silver
s3://trase-storage/brazil/trade/bol/2023/silver/brazil_bol_2023_silver.parquet
Dbt path: trase_production.main_brazil.brazil_bol_2023_silver
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/trade/bol/_schema_brazil_bol.yml
Model file link: trase/data_pipeline/models/brazil/trade/bol/2023/brazil_bol_2023_silver.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: brazil, bol, trade, 2023, silver
brazil_bol_2023_silver
Description
Cleans the Brazil BOL for 2023, and includes various tests to check the data is ok. Find where the fields come from and how they were cleaned in the fields documentation.
Details
| Column | Type | Description |
|---|---|---|
commodity |
VARCHAR |
Uppercase name of the commodity (BEEF, SOY, etc.) based on the hs4 code, and the list of commodities in the Trase reference table postgres_commodities |
country_of_destination_label |
VARCHAR |
Based on the Datamar source field PLACE_AND_PORTS_DEST_COUNTRY: "Country where the carrier delivered the cargo" |
country_of_destination_name |
VARCHAR |
Official Trase country name where the export was delivered. It is cleaned based on country_of_destination_label field Note that European Union countries are not unified as 'EUROPEAN UNION' country, although the source BOL does has 768 records with the label "EUROPEAN" instead of a specific country. |
country_of_destination_trase_id |
VARCHAR |
Official Trase country id, using the iso two letter country code based on the field country_of_destination_name |
date |
DATE |
Based on the source field DATES_LONG_HAUL_YYYYMMDD: Berthing (arrival) date of the long haul vessel: Year, month and day |
economic_bloc |
VARCHAR |
If the country of destination is part of the EU for the year of the BOL, it sets the value "EUROPEAN UNION" here. Note that currently no other economic blocs are being identified. If this is done in the future, probably the field will turn into an array. |
exporter_cnpj |
VARCHAR |
Based on the source field COMPANY_SHIPPER_REGISTRATION_NUMBER (the shipper is the exporter), which already only contains digits. The cleaning of this field uses stdnum.br python library to identify if its a valid cnpj, cpf, or unkown (and sets this in the exporter_type field). If its a valid cnpj it left pads with 0 until it reaches 14 characters, and if its a cpf left pads to 11 characters. |
exporter_country_label |
VARCHAR |
Based on the source field COMPANY_SHIPPER_COUNTRY_NAME: country name of the exporter |
exporter_group |
VARCHAR |
Official Trase trader group name, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label. As a trader can change its group affiliation in time, the group valid for the current BOL year is taken. |
exporter_label |
VARCHAR |
Based on the source field COMPANY_SHIPPER_SHIPPER_NAME_DETAILED: Detailed Shipper (exporter) name. |
exporter_municipality_label |
VARCHAR |
Based on the source field COMPANY_SHIPPER_CITY: Shipper (exporter) city |
exporter_municipality_name |
VARCHAR |
Official Trase name of the municipality, based on the exporter_municipality_label, and cleaning it based on the postgres_regions reference table - where it checks the municipality exists and takes the official name |
exporter_municipality_trase_id |
VARCHAR |
Official Trase id of the municipality, usually created based on an official geocode of it. It is based on the exporter_municipality_label, and cleaning it based on the postgres_regions reference table - where it checks the municipality exists and takes the id |
exporter_name |
VARCHAR |
Official Trase trader name, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label field |
exporter_state_label |
VARCHAR |
Based on the source field COMPANY_SHIPPER_STATE_NAME: Shipper (exporter) state name |
exporter_state_name |
VARCHAR |
Official Trase name of the state, based on the exporter_state_label, and cleaning it based on the postgres_regions reference table - where it checks the state exists and takes the official name |
exporter_state_trase_id |
VARCHAR |
Official Trase id of the state, based on the exporter_state_label, and cleaning it based on the postgres_regions reference table - where it checks the state exists and takes the official id |
exporter_trase_id |
VARCHAR |
Official Trase trader id, taken from the trader reference database. First it checks the reference data based on the tax identification of the record, and then based on the exporter_label field. The trase id is built based on the tax number. |
exporter_type |
VARCHAR |
Based on the field exporter_cnpj, it identifies if its a valid cnpj, cpf or if its of unknown type. |
fob |
DOUBLE |
Based on the field fob_original, including imputing empty values based on COMTRADE or BOL average values for the hs6 code. Where imputed, the field fob_adjustment_type shows which kind of imputing was done. |
fob_adjustment_type |
VARCHAR |
|
fob_original |
DOUBLE |
Based on the source field MEASURES_SECEX_FOB. Though this field doesn't exist in the vendor glossary, there is a similar field in the glossary called MEASURES_FOB_VALUE_USD: Monthly average FOB value per commodity in US dollars. |
hs4 |
VARCHAR |
|
hs5 |
VARCHAR |
|
hs6 |
VARCHAR |
|
hs6_description |
VARCHAR |
|
hs8 |
VARCHAR |
|
importer_group |
VARCHAR |
|
importer_label |
VARCHAR |
|
importer_name |
VARCHAR |
|
month |
INTEGER |
|
net_weight_kg |
DOUBLE |
Based on the source field MEASURES_WTKG: Weight in kilos |
net_weight_tonnes |
DOUBLE |
Based on the source field MEASURES_WTMT: Weight in metric tonnes |
port_of_export_label |
VARCHAR |
Based on the source field PLACE_AND_PORTS_POL_NAME: POL โ PORT OF LOADING โ Port where the LONG HAUL vessel loaded the cargo |
port_of_export_name |
VARCHAR |
Official Trase port name where the cargo was loaded. It is cleaned based on port_of_export_label field and the postgres_ports reference table, checking also the port exists in the port_of_export_country |
port_of_import_country_label |
VARCHAR |
Based on the source field PLACE_AND_PORTS_POD_COUNTRY: POD โ PORT OF DISCHARGE โ Country where the LONG HAUL vessel discharged the cargo |
port_of_import_country_name |
VARCHAR |
|
port_of_import_label |
VARCHAR |
Based on the source field PLACE_AND_PORTS_POD_NAME: POD โ PORT OF DISCHARGE โ Port where the LONG HAUL vessel discharged the cargo |
port_of_import_name |
VARCHAR |
Official Trase port name where the cargo was discharged. If there is no current existing port with that name in the port_of_import_country, it returns a NULL. It is cleaned based on port_of_import_label field and the postgres_ports reference table. |
usd_per_kg_record |
DOUBLE |
|
usd_per_kg_reference |
DOUBLE |
|
usd_per_kg_zscore |
DOUBLE |
|
CARGO_TRANSPORT_CARGO_TYPE |
VARCHAR |
|
CARGO_TRANSPORT_CONTAINER_TYPE |
VARCHAR |
Container Type (R = Reefer, D = Dry, T= Tank, 0 = bulk cargo) |
CARGO_TRANSPORT_MODE |
VARCHAR |
LCL (Less Container Load) or FCL (Full Container Load) |
CARGO_TRANSPORT_PACKAGE_TYPE |
VARCHAR |
Type of packaging used (this field is not always filled in) |
CARRIER_CARRIER_AGENT |
VARCHAR |
Carrier's maritime agent in port - there is also the vessel operator's agent |
CARRIER_CARRIER_DATAMAR |
VARCHAR |
Name of the carrier - shows actual carrier when Carrier_Name is a 'carrier of convenience' |
CARRIER_CARRIER_DATAMAR_GROUP_NAME |
VARCHAR |
Name of carrier in reference to SCAC (Standard Carrier Alpha Code) |
CARRIER_CARRIER_DATAMAR_GROUP_SCAC |
VARCHAR |
4 letter code identifying the carrier |
CARRIER_CARRIER_GROUP_NAME |
VARCHAR |
Name of carrier in reference to SCAC (Standard Carrier Alpha Code) |
CARRIER_CARRIER_GROUP_SCAC |
VARCHAR |
4 letter code identifying the carrier |
CARRIER_CARRIER_NAME |
VARCHAR |
Name of the carrier |
BL_DESCRIPTION_BL_DESCRIPTION |
VARCHAR |
|
DTM_COMMODITY_GROUP_EN |
VARCHAR |
|
DTM_COMMODITY_GROUP_PT |
VARCHAR |
|
COMMODITY_HS_HS2_CODE |
VARCHAR |
First 2 digits of the HS code |
COMMODITY_HS_HS2_ENGLISH |
VARCHAR |
General description of goods in HS2 in English |
COMMODITY_HS_HS2_PORTUGUES |
VARCHAR |
General description of goods in HS2 in Portuguese |
COMMODITY_HS_HS4_CODE |
VARCHAR |
First 4 digits of the HS code |
COMMODITY_HS_HS4_ENGLISH |
VARCHAR |
General description of goods in HS4 in English |
COMMODITY_HS_HS4_PORTUGUES |
VARCHAR |
General description of goods in HS4 in Portuguese |
COMMODITY_HS_HS6_CODE |
VARCHAR |
First 6 digits of the HS code |
COMMODITY_HS_HS6_ENGLISH |
VARCHAR |
General description of goods in HS6 in English |
COMMODITY_HS_HS6_PORTUGUES |
VARCHAR |
General description of goods in HS6 in Portuguese |
COMMODITY_HS_HS8_CODE |
VARCHAR |
Full 8 digits of Mercosul's Commodity code |
COMMODITY_HS_HS8_PORTUGUES |
VARCHAR |
General description of goods in HS8 in Portuguese |
COMMODITY_HS_DATAMAR_HS2_CODE |
VARCHAR |
First 2 digits of the HS code - with Datamar analysis |
COMMODITY_HS_DATAMAR_HS2_ENGLISH |
VARCHAR |
General description of goods in HS2 in English |
COMMODITY_HS_DATAMAR_HS2_PORTUGUES |
VARCHAR |
General description of goods in HS2 in Portuguese |
COMMODITY_HS_DATAMAR_HS4_CODE |
VARCHAR |
First 4 digits of the HS code - with Datamar analysis |
COMMODITY_HS_DATAMAR_HS4_ENGLISH |
VARCHAR |
General description of goods in HS4 in English |
COMMODITY_HS_DATAMAR_HS4_PORTUGUES |
VARCHAR |
General description of goods in HS4 in Portuguese |
COMMODITY_HS_DATAMAR_HS6_CODE |
VARCHAR |
First 6 digits of the HS code - with Datamar analysis |
COMMODITY_HS_DATAMAR_HS6_ENGLISH |
VARCHAR |
General description of goods in HS6 in English |
COMMODITY_HS_DATAMAR_HS6_PORTUGUES |
VARCHAR |
General description of goods in HS6 in Portuguese |
COMMODITY_HS_DATAMAR_HS8_CODE |
VARCHAR |
Full 8 digits of Mercosul's Commodity code - with Datamar analysis |
COMMODITY_HS_DATAMAR_HS8_PORTUGUES |
VARCHAR |
General description of goods in HS8 in Portuguese |
COMPANY_CONSIGNEE_CITY |
VARCHAR |
City name |
COMPANY_CONSIGNEE_CONSIGNEE_NAME |
VARCHAR |
Consignee name |
COMPANY_CONSIGNEE_CONSIGNEE_NAME_DETAILED |
VARCHAR |
Detailed Consignee name |
COMPANY_CONSIGNEE_CONSIGNEE_GROUP |
VARCHAR |
|
COMPANY_CONSIGNEE_REGISTRATION_NUMBER |
VARCHAR |
Registration number |
COMPANY_CONSIGNEE_COUNTRY |
VARCHAR |
Country name |
COMPANY_CONSIGNEE_NEIGHBORHOOD |
VARCHAR |
Neighbourhood Name |
COMPANY_CONSIGNEE_STATE |
VARCHAR |
State code |
COMPANY_CONSIGNEE_STATE_NAME |
VARCHAR |
State name |
COMPANY_CONSIGNEE_STREET |
VARCHAR |
Street name |
COMPANY_CONSIGNEE_TYPE |
VARCHAR |
Type of company (Real cargo owner or forwarder / NVOCC) |
COMPANY_CONSIGNEE_ZIP |
VARCHAR |
Zip code |
COMPANY_FORWARDER_CITY |
VARCHAR |
City name |
COMPANY_FORWARDER_COUNTRY |
VARCHAR |
Country name |
COMPANY_FORWARDER_FORWARDER_NAME |
VARCHAR |
Forwarder name |
COMPANY_FORWARDER_FORWARDER_NAME_DETAILED |
VARCHAR |
Detailed Forwarder name |
COMPANY_FORWARDER_FORWARDER_GROUP |
VARCHAR |
|
COMPANY_FORWARDER_REGISTRATION_NUMBER |
VARCHAR |
Registration number |
COMPANY_FORWARDER_NEIGHBORHOOD |
VARCHAR |
Neighbourhood Name |
COMPANY_FORWARDER_STATE |
VARCHAR |
State code |
COMPANY_FORWARDER_STATE_NAME |
VARCHAR |
State name |
COMPANY_FORWARDER_STREET |
VARCHAR |
Street name |
COMPANY_FORWARDER_TYPE |
VARCHAR |
Type of company (Real cargo owner or forwarder / NVOCC) |
COMPANY_FORWARDER_ZIP |
VARCHAR |
Zip code |
COMPANY_NOTIFY_CITY |
VARCHAR |
City name |
COMPANY_NOTIFY_COUNTRY |
VARCHAR |
Country name |
COMPANY_NOTIFY_NEIGHBORHOOD |
VARCHAR |
Neighbourhood name |
COMPANY_NOTIFY_NOTIFY_NAME |
VARCHAR |
Notify name |
COMPANY_NOTIFY_NOTIFY_NAME_DETAILED |
VARCHAR |
Detailed Notify name |
COMPANY_NOTIFY_NOTIFY_GROUP |
VARCHAR |
|
COMPANY_NOTIFY_REGISTRATION_NUMBER |
VARCHAR |
Registration number |
COMPANY_NOTIFY_STATE |
VARCHAR |
State code |
COMPANY_NOTIFY_STATE_NAME |
VARCHAR |
State name |
COMPANY_NOTIFY_STREET |
VARCHAR |
Street name |
COMPANY_NOTIFY_TYPE |
VARCHAR |
Type of company (Real cargo owner or forwarder / NVOCC) |
COMPANY_NOTIFY_ZIP |
VARCHAR |
Zip code |
COMPANY_SHIPPER_CITY |
VARCHAR |
City name |
COMPANY_SHIPPER_COUNTRY_NAME |
VARCHAR |
Country name |
COMPANY_SHIPPER_NEIGHBORHOOD |
VARCHAR |
Neighbourhood name |
COMPANY_SHIPPER_SHIPPER_NAME |
VARCHAR |
Shipper name |
COMPANY_SHIPPER_SHIPPER_NAME_DETAILED |
VARCHAR |
Detailed Shipper name |
COMPANY_SHIPPER_SHIPPER_GROUP |
VARCHAR |
Name of the group to which the Shipper belongs |
COMPANY_SHIPPER_REGISTRATION_NUMBER |
VARCHAR |
Registration number |
COMPANY_SHIPPER_STATE |
VARCHAR |
State code |
COMPANY_SHIPPER_STATE_NAME |
VARCHAR |
State name |
COMPANY_SHIPPER_STREET |
VARCHAR |
Street name |
COMPANY_SHIPPER_TYPE |
VARCHAR |
Type of company (Real cargo owner or forwarder / NVOCC) |
COMPANY_SHIPPER_ZIP |
VARCHAR |
Zip code |
IDENTIFICATION_ID_DATAMAR |
VARCHAR |
Internal code for Datamar to identify the data in that record |
OPERATOR_FEEDER_OPERATOR_FEEDER_AGENT |
VARCHAR |
Feeder vessel operator's agent |
OPERATOR_FEEDER_OPERATOR_FEEDER_NAME |
VARCHAR |
Feeder vessel operator name |
DATES_FEEDER_DATE |
VARCHAR |
|
DATES_FEEDER_DAYOFWEEK |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Day of the week (Monday to Friday) |
DATES_FEEDER_DD |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Day |
DATES_FEEDER_FIRSTDAYOFWEEK |
VARCHAR |
Berthing (arrival) date of the long haul vessel: First day of the week |
DATES_FEEDER_LASTDAYOFWEEK |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Last day of the week |
DATES_FEEDER_MM |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Month |
DATES_FEEDER_QTR |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Quarter |
DATES_FEEDER_SEM |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Semester |
DATES_FEEDER_WEEKYEAR |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Number of weeks |
DATES_FEEDER_YYYY |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Year |
DATES_FEEDER_YYYYMM |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Year and month |
DATES_FEEDER_YYYYMMDD |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Year, month and day |
DATES_FEEDER_YYYYQTR |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Year and quarter |
DATES_FEEDER_YYYYSEM |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Year and semester |
DATES_FEEDER_YYYYWW |
VARCHAR |
Berthing (arrival) date of the long haul vessel: Year and week |
DATES_LONG_HAUL_DATE |
VARCHAR |
|
DATES_LONG_HAUL_DAYOFWEEK |
VARCHAR |
Day of the week (Monday to Friday) |
DATES_LONG_HAUL_DD |
VARCHAR |
Day |
DATES_LONG_HAUL_FIRSTDAYOFWEEK |
VARCHAR |
First day of the week |
DATES_LONG_HAUL_LASTDAYOFWEEK |
VARCHAR |
Last day of the week |
DATES_LONG_HAUL_MM |
VARCHAR |
Month |
DATES_LONG_HAUL_QTR |
VARCHAR |
Quarter |
DATES_LONG_HAUL_SEM |
VARCHAR |
Semester |
DATES_LONG_HAUL_WEEKYEAR |
VARCHAR |
Number of weeks |
DATES_LONG_HAUL_YYYY |
VARCHAR |
Year |
DATES_LONG_HAUL_YYYYMM |
VARCHAR |
Year and month |
DATES_LONG_HAUL_YYYYMMDD |
VARCHAR |
Year, month and day |
DATES_LONG_HAUL_YYYYQTR |
VARCHAR |
Year and quarter |
DATES_LONG_HAUL_YYYYSEM |
VARCHAR |
Year and semester |
DATES_LONG_HAUL_YYYYWW |
VARCHAR |
Year and week |
PLACE_AND_PORTS_DEST_COUNTRY |
VARCHAR |
DEST โ DESTINATION โ Country where the carrier delivered the cargo |
PLACE_AND_PORTS_DEST_NAME |
VARCHAR |
DEST โ DESTINATION โ Location where the carrier delivered the cargo |
PLACE_AND_PORTS_DEST_TRADELANE |
VARCHAR |
DEST โ DESTINATION โ Tradelane where the carrier delivered the cargo |
PLACE_AND_PORTS_POD_COUNTRY |
VARCHAR |
POD โ PORT OF DISCHARGE โ Country where the LONG HAUL vessel discharged the cargo |
PLACE_AND_PORTS_POD_NAME |
VARCHAR |
POD โ PORT OF DISCHARGE โ Port where the LONG HAUL vessel discharged the cargo |
PLACE_AND_PORTS_POD_TERMINAL |
VARCHAR |
POD โ PORT OF DISCHARGE โ Terminal where the LONG HAUL vessel discharged the cargo (only in ECSA) |
PLACE_AND_PORTS_POD_TRADELANE |
VARCHAR |
POD โ PORT OF DISCHARGE โ Tradelane where the LONG HAUL vessel discharged the cargo |
PLACE_AND_PORTS_POL_COUNTRY |
VARCHAR |
POL โ PORT OF LOADING โ Country where the LONG HAUL vessel loaded the cargo |
PLACE_AND_PORTS_POL_NAME |
VARCHAR |
POL โ PORT OF LOADING โ Port where the LONG HAUL vessel loaded the cargo |
PLACE_AND_PORTS_POL_TERMINAL |
VARCHAR |
POL โ PORT OF LOADING โ Terminal where the LONG HAUL vessel loaded the cargo (only in ECSA) |
PLACE_AND_PORTS_POL_TRADELANE |
VARCHAR |
POL โ PORT OF LOADING โ Tradelane where the LONG HAUL vessel loaded the cargo |
PLACE_AND_PORTS_POMD_COUNTRY |
VARCHAR |
POMD โ PORT OF MARITIME DESTINATION โ Last maritime country of cargo |
PLACE_AND_PORTS_POMD_NAME |
VARCHAR |
POMD โ PORT OF MARITIME DESTINATION โ Last maritime port of cargo |
PLACE_AND_PORTS_POMD_TERMINAL |
VARCHAR |
POMD โ PORT OF MARITIME DESTINATION - Terminal where the vessel feeder discharged the cargo (only in ECSA) |
PLACE_AND_PORTS_POMD_TRADELANE |
VARCHAR |
POMD โ PORT OF MARITIME DESTINATION โ Last maritime tradelane of cargo |
PLACE_AND_PORTS_POMO_COUNTRY |
VARCHAR |
POMO โ PORT OF MARITIME ORIGIN โ First maritime country of cargo |
PLACE_AND_PORTS_POMO_NAME |
VARCHAR |
POMO โ PORT OF MARITIME ORIGIN โ First maritime port of cargo |
PLACE_AND_PORTS_POMO_TERMINAL |
VARCHAR |
POMO โ PORT OF MARITIME ORIGIN โ Terminal where the vessel feeder loaded the cargo (only in ECSA) |
PLACE_AND_PORTS_POMO_TRADELANE |
VARCHAR |
POMO โ PORT OF MARITIME ORIGIN โ First maritime tradeline of cargo |
PLACE_AND_PORTS_POR_COUNTRY |
VARCHAR |
POR โ PLACE OF RECEIPT โ Country where the carrier took charge of the cargo |
PLACE_AND_PORTS_POR_NAME |
VARCHAR |
POR โ PLACE OF RECEIPT โ Location where the carrier took charge of the cargo |
PLACE_AND_PORTS_POR_TRADELANE |
VARCHAR |
POR โ PLACE OF RECEIPT โ Tradelane where the carrier took charge of the cargo |
SERVICE_PORT_ROTATION |
VARCHAR |
Ports serviced in the rotation order |
SERVICE_SERVICE_NAME |
VARCHAR |
The commercial service of the vessel |
SERVICE_TRANSIT_TIME |
VARCHAR |
Round Voyage duration in days |
VESSEL_FEEDER_CONTAINER_CAPACITY_FEEDER |
VARCHAR |
Nominal capacity of containers on the vessel in TEUs |
VESSEL_FEEDER_DWT_FEEDER |
VARCHAR |
Deadweight in tons |
VESSEL_FEEDER_IMO_FEEDER |
VARCHAR |
IMO (International Maritime Organization) number of the vessel |
VESSEL_FEEDER_REEFER_CAPACITY_FEEDER |
VARCHAR |
Nominal capacity of containers reefer on the vessel |
VESSEL_FEEDER_VESSEL_NAME_FEEDER |
VARCHAR |
Name of Vessel |
VESSEL_FEEDER_VESSEL_TYPE_FEEDER |
VARCHAR |
Type of Vessel (Container, General Cargo, Tank etc) |
VESSEL_FEEDER_VOYAGE_FEEDER |
VARCHAR |
Voyage number |
VESSEL_LONG_HAUL_CONTAINER_CAPACITY |
VARCHAR |
|
VESSEL_LONG_HAUL_DWT |
VARCHAR |
|
VESSEL_LONG_HAUL_IMO |
VARCHAR |
|
VESSEL_LONG_HAUL_REEFER_CAPACITY |
VARCHAR |
|
VESSEL_LONG_HAUL_VESSEL_LONG_HAUL_NAME |
VARCHAR |
|
VESSEL_LONG_HAUL_VESSEL_LONG_HAUL_TYPE |
VARCHAR |
|
MEASURES_C20 |
VARCHAR |
Quantity of 20' containers |
MEASURES_C40 |
VARCHAR |
Quantity of 40' containers |
MEASURES_C20_PLUS_C40 |
VARCHAR |
|
MEASURES_SECEX_FOB |
VARCHAR |
|
MEASURES_PACKAGEQTY |
VARCHAR |
Quantity of packs/packages |
MEASURES_TEU |
VARCHAR |
Quantity of TEUs |
MEASURES_WTMT |
VARCHAR |
Weight in kilos |
MEASURES_WTKG |
VARCHAR |
Weight in metric tons |
Models / Seeds
source.trase_duckdb.source_brazil.original_brazil_soy_beef_bol_2023model.trase_duckdb.postgres_countriesmodel.trase_duckdb.postgres_regions_without_geometrymodel.trase_duckdb.postgres_portsmodel.trase_duckdb.hs2017model.trase_duckdb.postgres_commoditiesmodel.trase_duckdb.postgres_tradersmodel.trase_duckdb.comtrade_exports_year_exporter_hs6
Sources
['source_brazil', 'original_brazil_soy_beef_bol_2023']
No called script or script source not found.
"""
Brazil BOL 2023 Silver Data Pipeline
This script processes the Brazil Bill of Lading (BOL) data for the year 2023.
It relies in Polars and existing parquet files, instead of Pandas and Postgres access.
It loads the data as lazy frames (not in eager execution), so when the model runs
dbt-duckdb brings everything together and collects the data.
Tests are defined in the dbt schema, so skipping the assertions here.
It performs various cleaning tasks, including:
* Renaming and normalizing columns
* Filtering records based on country and HS codes
* Cleaning country names
* Adding economic bloc information
* Cleaning state and municipality names
* Cleaning port names
* Cleaning CNPJ/CPF numbers
* Cleaning importer and exporter names
* Cleaning fobs (imputing missing and adjusting a few outliers)
* Reordering columns for better readability
"""
import polars as pl
import stdnum.br.cnpj
import stdnum.br.cpf
YEAR = 2023
def rename_and_normalize_columns(bol_lf):
"""
Select the columns of interest, renames them and converts the types,
filtering for the beef hs codes
"""
columns = {
"DATES_LONG_HAUL_YYYYMMDD": "date",
# hs codes
"COMMODITY_HS_HS4_CODE": "hs4",
"COMMODITY_HS_HS6_CODE": "hs6",
"COMMODITY_HS_HS6_ENGLISH": "hs6_description",
"COMMODITY_HS_HS8_CODE": "hs8",
# exporters
"COMPANY_SHIPPER_CITY": "exporter_municipality_label",
"COMPANY_SHIPPER_REGISTRATION_NUMBER": "exporter_cnpj",
"COMPANY_SHIPPER_SHIPPER_NAME_DETAILED": "exporter_label",
"COMPANY_SHIPPER_STATE_NAME": "exporter_state_label",
"COMPANY_SHIPPER_COUNTRY_NAME": "exporter_country_label",
# ports, country
"PLACE_AND_PORTS_POL_NAME": "port_of_export_label",
"PLACE_AND_PORTS_POD_NAME": "port_of_import_label",
"PLACE_AND_PORTS_POD_COUNTRY": "port_of_import_country_label",
"PLACE_AND_PORTS_DEST_COUNTRY": "country_of_destination_label",
# importer
"COMPANY_CONSIGNEE_CONSIGNEE_NAME_DETAILED": "importer_label",
# net_weight_kg, fob
"MEASURES_WTKG": "net_weight_kg",
"MEASURES_WTMT": "net_weight_tonnes",
"MEASURES_SECEX_FOB": "fob",
}
# Make a copy of the columns of interest. Keeping the original
# in case we want to review transformations
bol_lf = bol_lf.with_columns(
[pl.col(orig_col).alias(new_col) for orig_col, new_col in columns.items()]
)
bol_lf = bol_lf.with_columns(
date=pl.col("date").str.to_date(format="%Y%m%d", strict=False),
# Add month as integer extracted from date
month=pl.col("date")
.str.to_date(format="%Y%m%d", strict=False)
.dt.month()
.cast(pl.Int32),
# Ensure proper decimal formatting and scientific notation handling
net_weight_kg=pl.col("net_weight_kg").str.replace(",", ".").cast(pl.Float64),
net_weight_tonnes=pl.col("net_weight_tonnes")
.str.replace(",", ".")
.cast(pl.Float64),
fob=pl.col("fob").str.replace(",", ".").cast(pl.Float64),
)
do_not_clean = [
"date",
"month",
"hs4",
"hs6",
"net_weight_kg",
"net_weight_tonnes",
"fob",
]
# Trim, normalize, uppercase, remove extra spaces
bol_lf = bol_lf.with_columns(
[
pl.col(col).str.strip_chars()
# Replace multiple spaces with a single space
.str.replace(r"\s+", " ")
# Convert accents and diacritics (see https://stackoverflow.com/a/77217563)
.str.normalize("NFKD")
.str.replace_all(r"\p{CombiningMark}", "")
.str.to_uppercase()
.alias(col)
for col in columns.values()
if col not in do_not_clean
]
)
# Add hs5 as the first 5 characters of hs6
bol_lf = bol_lf.with_columns(hs5=pl.col("hs6").str.slice(0, 5))
return bol_lf
def clean_countries(bol_lf, countries_lf):
"""
Adjusts 'country_of_destination_label' and 'port_of_import_country_label'
with the 'official' country names used in Trase. Assumes that all
'country_of_destination_label' exist in the official country list.
"""
# Clean country_of_destination_name
country_synonyms_lf = countries_lf.select(
"country_trase_id", "country_name", "synonyms"
)
country_synonyms_lf = country_synonyms_lf.rename(
{
"country_trase_id": "country_of_destination_trase_id",
"country_name": "country_of_destination_name",
}
)
country_synonyms_lf = country_synonyms_lf.explode("synonyms") # unpack synonyms[]
bol_lf = bol_lf.join(
country_synonyms_lf,
how="left",
left_on="country_of_destination_label",
right_on="synonyms",
validate="m:1",
)
# clean port_of_import_country_name
country_synonyms_lf = country_synonyms_lf.drop(
"country_of_destination_trase_id"
) # not using it here
country_synonyms_lf = country_synonyms_lf.rename(
{
"country_of_destination_name": "port_of_import_country_name",
}
)
bol_lf = bol_lf.join(
country_synonyms_lf,
how="left",
left_on="port_of_import_country_label",
right_on="synonyms",
validate="m:1",
)
return bol_lf
def filter_country_and_hscodes(bol_lf, existing_hs_codes_lf, commodities_lf):
"""
Filters for records from Brazil, and known hs codes. Also adds a field called 'commodity'
with the commodity type based on the hs4 code.
Summary of records removed:
exporter_country_label != 'BRAZIL':
- UNKNOWN: 332
- ARGENTINA: 36
- PARAGUAY: 25
- URUGUAY: 25
hs4 not in 'existing_hscodes' reference table:
- 20 records with the HS CODE 0033
"""
# Filters out
bol_lf = bol_lf.filter(pl.col("exporter_country_label") == pl.lit("BRAZIL"))
# Keep only records with known hs4 codes (using semi join)
existing_hs4_codes_lf = existing_hs_codes_lf.filter(pl.col("type") == pl.lit("hs4"))
bol_lf = bol_lf.join(
existing_hs4_codes_lf,
how="semi",
left_on="hs4",
right_on="code",
)
# Add a 'commodity' field based on the hs4 code, and the commodities_lf
commodities_lf = commodities_lf.select("hs_code", "commodity")
commodities_lf = commodities_lf.with_columns(hs4=pl.col("hs_code").str.slice(0, 4))
commodities_lf = commodities_lf.drop("hs_code")
commodities_lf = commodities_lf.unique(subset=["hs4"], keep="first")
bol_lf = bol_lf.join(
commodities_lf,
how="left",
on="hs4",
validate="m:1",
)
return bol_lf
def add_european_union_bloc(bol_lf, countries_lf):
"""
Adds a 'economic_bloc_name' column with "EUROPEAN UNION" for countries currently making part of it.
If in the future more economic blocs are considered, probably 'economic_bloc_name' would change to be
an array.
Takes the 'countries_lf' which contains, per country, an array of json strings with the information
of the economic blocs.
"""
economic_blocs_lf = countries_lf.select("country_trase_id", "economic_bloc")
economic_blocs_lf = economic_blocs_lf.explode("economic_bloc")
# Turn the json string into a struct with the appropriate types
dtype = pl.Struct(
[
pl.Field("economic_bloc", pl.Utf8),
pl.Field("time_start", pl.Datetime),
pl.Field("time_end", pl.Datetime),
]
)
economic_blocs_lf = economic_blocs_lf.with_columns(
economic_bloc_struct=pl.col("economic_bloc").str.json_decode(dtype)
)
# Flatten the struct into new fields
economic_blocs_lf = economic_blocs_lf.with_columns(
economic_bloc_name=pl.col("economic_bloc_struct").struct.field("economic_bloc"),
time_start=pl.col("economic_bloc_struct").struct.field("time_start"),
time_end=pl.col("economic_bloc_struct").struct.field("time_end"),
)
# Filter for EUROPEAN UNION and compatible start / end dates (specially for filtering UK)
economic_blocs_lf = (
economic_blocs_lf.filter(
pl.col("economic_bloc_name") == pl.lit("EUROPEAN UNION")
)
.filter(pl.col("time_start").dt.year() <= pl.lit(YEAR))
.filter(
(pl.col("time_end").is_null())
| (pl.col("time_end").dt.year() >= pl.lit(YEAR))
)
)
economic_blocs_lf = economic_blocs_lf.select(
"country_trase_id", "economic_bloc_name"
)
economic_blocs_lf = economic_blocs_lf.rename(
{"economic_bloc_name": "economic_bloc"}
)
bol_lf = bol_lf.join(
economic_blocs_lf,
how="left",
left_on="country_of_destination_trase_id",
right_on="country_trase_id",
validate="m:1",
)
return bol_lf
def clean_states_and_municipalities(bol_lf, regions_lf):
"""
Cleans the state label for the official one we use in Trase. Assumes
all synonyms exist for the labels. [pending: Also corrects if there are wrongly
assigned municipality - state values in the trade data]
"""
# ------------------------------------------------------------------------
# Clean states
# ------------------------------------------------------------------------
regions_lf = regions_lf.select(
"trase_id", "name", "synonyms", "country", "level", "parent_trase_id"
)
# Only valid states from brazil (level 3, with trase_id of the form 'BR-XX')
states_lf = regions_lf.filter(
(pl.col("level") == pl.lit(3))
& (pl.col("country") == pl.lit("BRAZIL"))
& (pl.col("trase_id").str.len_chars() == pl.lit(5))
)
states_lf = states_lf.rename(
{
"trase_id": "exporter_state_trase_id",
"name": "exporter_state_name",
"synonyms": "state_synonyms",
}
)
states_lf = states_lf.drop(["country", "level"])
states_lf = states_lf.explode("state_synonyms")
bol_lf = bol_lf.join(
states_lf,
how="left",
left_on="exporter_state_label",
right_on="state_synonyms",
validate="m:1",
)
# Manually correct 'BARUERI' municipality from PARANA state to SAO PAULO, with corresponding id's
bol_lf = bol_lf.with_columns(
# State correction
exporter_state_trase_id=pl.when(
(pl.col("exporter_municipality_label") == pl.lit("BARUERI"))
& (pl.col("exporter_state_trase_id") == pl.lit("BR-41"))
)
.then(pl.lit("BR-35"))
.otherwise(pl.col("exporter_state_trase_id")),
exporter_state_name=pl.when(
(pl.col("exporter_municipality_label") == pl.lit("BARUERI"))
& (pl.col("exporter_state_trase_id") == pl.lit("BR-41"))
)
.then(pl.lit("SAO PAULO"))
.otherwise(pl.col("exporter_state_name")),
)
# Assign to missing labels the default UNKNOWN identifiers
bol_lf = bol_lf.with_columns(
exporter_state_trase_id=pl.when(pl.col("exporter_state_label").is_null())
.then(pl.lit("BR-XX"))
.otherwise(pl.col("exporter_state_trase_id")),
exporter_state_name=pl.when(pl.col("exporter_state_label").is_null())
.then(pl.lit("UNKNOWN STATE"))
.otherwise(pl.col("exporter_state_name")),
)
bol_lf = bol_lf.drop("parent_trase_id")
# ------------------------------------------------------------------------
# Clean municipalities
# ------------------------------------------------------------------------
# load Brazil municipalities (level 6, with trase_id of the form 'BR-XXXXXXX')
municipalities_lf = regions_lf.filter(
(pl.col("level") == pl.lit(6))
& (pl.col("country") == pl.lit("BRAZIL"))
& (pl.col("trase_id").str.len_chars() == pl.lit(10))
)
municipalities_lf = municipalities_lf.rename(
{
"trase_id": "exporter_municipality_trase_id",
"name": "exporter_municipality_name",
"synonyms": "municipality_synonyms",
}
)
municipalities_lf = municipalities_lf.drop(["country", "level"])
municipalities_lf = municipalities_lf.explode("municipality_synonyms")
# In the join, label-synonyms also have to match the parent state
bol_lf = bol_lf.join(
municipalities_lf,
how="left",
left_on=["exporter_state_trase_id", "exporter_municipality_label"],
right_on=["parent_trase_id", "municipality_synonyms"],
)
# Manually assign 'VARZEA GRANDE' (BR-5108402) to 'CAPAO GRANDE' (its an industrial district within the municipality)
bol_lf = bol_lf.with_columns(
exporter_municipality_trase_id=pl.when(
pl.col("exporter_municipality_label") == pl.lit("CAPAO GRANDE")
)
.then(pl.lit("BR-5108402"))
.otherwise(pl.col("exporter_municipality_trase_id")),
exporter_municipality_name=pl.when(
pl.col("exporter_municipality_label") == pl.lit("CAPAO GRANDE")
)
.then(pl.lit("VARZEA GRANDE"))
.otherwise(pl.col("exporter_municipality_name")),
)
# Fill default UNKNOWN identifiers for incorrect municipalities (10 records)
incorrect_municipality_labels = [
"COVINGTON",
"GOVERNADOR DIX SEPT ROSADO",
"CINGAPURA",
]
bol_lf = bol_lf.with_columns(
exporter_municipality_trase_id=pl.when(
pl.col("exporter_municipality_label").is_in(incorrect_municipality_labels)
)
.then(pl.lit("BR-XXXXXXX"))
.otherwise(pl.col("exporter_municipality_trase_id")),
exporter_municipality_name=pl.when(
pl.col("exporter_municipality_label").is_in(incorrect_municipality_labels)
)
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("exporter_municipality_name")),
)
return bol_lf
def clean_ports(bol_lf, ports_lf):
"""
Creates the 'port_of_export_name' and 'port_of_import_name' columns.
Cleans the 'port_of_export_label' and 'port_of_import_label' columns against the synonyms
in the ports reference table (currently a view from postgres views.regions created through
dbt model 'postgres_ports')
"""
ports_lf = ports_lf.select("name", "synonyms", "country")
# ------------------------------------------------------------------------
# Clean ports of export
# ------------------------------------------------------------------------
brazil_ports_lf = ports_lf.filter(pl.col("country") == pl.lit("BRAZIL"))
brazil_ports_lf = brazil_ports_lf.rename(
{
"name": "port_of_export_name",
"synonyms": "brazil_port_synonyms",
}
)
brazil_ports_lf = brazil_ports_lf.drop(["country"])
brazil_ports_lf = brazil_ports_lf.explode("brazil_port_synonyms")
bol_lf = bol_lf.join(
brazil_ports_lf,
how="left",
left_on="port_of_export_label",
right_on="brazil_port_synonyms",
validate="m:1",
)
# ------------------------------------------------------------------------
# Clean ports of import
# ------------------------------------------------------------------------
world_ports_lf = ports_lf.rename(
{
"name": "port_of_import_name",
"synonyms": "world_port_synonyms",
"country": "port_country",
}
)
world_ports_lf = world_ports_lf.explode("world_port_synonyms")
world_ports_lf = world_ports_lf.unique()
bol_lf = bol_lf.join(
world_ports_lf,
how="left",
left_on=["port_of_import_country_name", "port_of_import_label"],
right_on=["port_country", "world_port_synonyms"],
)
return bol_lf
def clean_cnpjs(bol_lf):
"""
Clean cnpjs and create a column 'exporter_type' indicating cnpj, cpf, or null
Note that a number can't be at the same time a valid cnpj or cpf, and that
some records might have 11 digits (as a cpf) but fail validation as they have
incorrect check digits
"""
bol_lf = bol_lf.with_columns(
# Create 'exporter_type' based on validity
pl.when(
pl.col("exporter_cnpj")
.str.pad_start(14, "0")
.map_elements(stdnum.br.cnpj.is_valid)
)
.then(pl.lit("cnpj"))
.when(
pl.col("exporter_cnpj")
.str.pad_start(11, "0")
.map_elements(stdnum.br.cpf.is_valid)
)
.then(pl.lit("cpf"))
.otherwise(pl.lit("unknown"))
.alias("exporter_type"),
# Normalize based on validity
pl.when(
pl.col("exporter_cnpj")
.str.pad_start(14, "0")
.map_elements(stdnum.br.cnpj.is_valid)
)
.then(pl.col("exporter_cnpj").str.pad_start(14, "0"))
.when(
pl.col("exporter_cnpj")
.str.pad_start(11, "0")
.map_elements(stdnum.br.cpf.is_valid)
)
.then(pl.col("exporter_cnpj").str.pad_start(11, "0"))
.when(pl.col("exporter_cnpj") == pl.lit("0"))
.then(pl.lit("0" * 14))
.otherwise(pl.col("exporter_cnpj"))
.alias("exporter_cnpj"),
)
return bol_lf
def identify_active_group_name(df, year, group_column):
"""
Takes a dataframe with field containing the groups information as available
in the trader reference dataset (a list of json strings), and adds a new
'group_name' field with the valid group name for the given year.
"""
# Add a row index to be able to group by it later
df = df.with_row_index("row_index")
df_exploded = df.explode(group_column)
# Casting as polars might identify the field as binary
df_exploded = df_exploded.with_columns(
pl.col(group_column).cast(pl.Utf8).alias(group_column)
)
# Turn the json string into a struct with the appropriate types
dtype = pl.Struct(
[
pl.Field("group", pl.Utf8),
pl.Field("time_start", pl.Datetime),
pl.Field("time_end", pl.Datetime),
]
)
df_exploded = df_exploded.with_columns(
group_struct=pl.col(group_column).str.json_decode(dtype)
)
# Flatten the struct into new fields
df_exploded = df_exploded.with_columns(
group_name=pl.col("group_struct").struct.field("group"),
time_start=pl.col("group_struct").struct.field("time_start"),
time_end=pl.col("group_struct").struct.field("time_end"),
)
# Only keep groups that are valid for the year
df_exploded = df_exploded.filter(
(pl.col("time_end").is_null()) | (pl.col("time_end").dt.year() > pl.lit(year))
)
# Group again by the index, and only keep a group name
df_unexploded = df_exploded.group_by("row_index").agg(
pl.max("group_name").alias("group_name")
)
result = df.join(df_unexploded, on="row_index", how="left").drop("row_index")
return result
def add_trader_data_by_label(
input_trade_data_lf, db_traders_lf, trader_label_colummn, country_iso2=None
):
"""
Adds trader information to the input trade data based on trader labels.
This function takes a DataFrame containing trade data and adds trader information
from a reference DataFrame. It matches trader labels in the input data with labels
in the reference data and returns the best match including trader name, trader_trase_id,
and trader group. The function also prioritizes traders based on the provided
country ISO2 code and filters for active groups based on the given year.
Parameters:
- input_trade_data_lf (DataFrame): The input trade data containing trader labels.
- db_traders_lf (DataFrame): The reference DataFrame containing trader information.
- trader_label_colummn (str): The column name in the input trade data that contains trader labels.
- country_iso2 (str, optional): The ISO2 code of the country to prioritize traders from. Defaults to None.
Returns:
- DataFrame: The input trade data with added trader information.
"""
# Take the reference trader information, and unnest the labels
db_traders_lf = db_traders_lf.select("labels", "name", "trase_id", "group_name")
db_traders_lf = db_traders_lf.explode("labels")
# Temporarily rename the trader_label_column to 'trader_label', and only work with it
input_trade_data_lf = input_trade_data_lf.rename(
{trader_label_colummn: "trader_label"}
)
input_labels_lf = input_trade_data_lf.select("trader_label").unique()
joined_with_label_data = input_labels_lf.join(
db_traders_lf, left_on="trader_label", right_on="labels", how="left"
)
# Add a country_priority field to prioritize the country's traders
joined_with_label_data = joined_with_label_data.with_columns(
country_priority=pl.when(
(pl.lit(country_iso2).is_not_null())
& (pl.col("trase_id").str.slice(0, 2) == pl.lit(country_iso2))
)
.then(pl.lit(1))
.otherwise(pl.lit(0))
)
# Aggregate, giving priority to the country's traders, and only taking one value
grouped_label_data = (
joined_with_label_data.group_by("trader_label").agg(
[pl.all().sort_by("country_priority", descending=True).head(1)]
)
).drop("country_priority")
# Disaggregate as we're only taking one matching record, and improve names
grouped_label_data = grouped_label_data.explode(["name", "trase_id", "group_name"])
grouped_label_data = grouped_label_data.rename(
{
"name": "trader_name",
"trase_id": "trader_trase_id",
"group_name": "trader_group_name",
}
)
# Add the results to the original input_trade_data_lf
input_trade_data_lf = input_trade_data_lf.join(
grouped_label_data,
how="left",
on="trader_label",
)
# Return the original trader_label_column name
input_trade_data_lf = input_trade_data_lf.rename(
{"trader_label": trader_label_colummn}
)
return input_trade_data_lf
def clean_importers(bol_lf, traders_lf):
"""
Note that the importers that were not identified by label at the first run, where manually
added to the database, as registered in the script
`trase/runbook/brazil/trade/bol/2023/brazil_trade_bol_ingestions_2023.py`
"""
# take the distinct importer labels, excluding 'UNKNOWN CUSTOMER' as it has multiple matches
importer_labels_lf = bol_lf.select("importer_label").unique()
importer_labels_lf = importer_labels_lf.filter(
pl.col("importer_label").ne("UNKNOWN CUSTOMER")
)
# Get the trader data for the importer labels, and merge it to bol_lf
importer_labels_lf = add_trader_data_by_label(
importer_labels_lf, traders_lf, "importer_label"
)
importer_labels_lf = importer_labels_lf.rename(
{
"trader_name": "importer_name",
"trader_group_name": "importer_group",
}
)
bol_lf = bol_lf.join(
importer_labels_lf,
how="left",
on="importer_label",
)
# Add manually the name and group for the 'UNKNOWN CUSTOMER' importer
bol_lf = bol_lf.with_columns(
importer_name=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_name")),
importer_group=pl.when(pl.col("importer_label") == pl.lit("UNKNOWN CUSTOMER"))
.then(pl.lit("UNKNOWN"))
.otherwise(pl.col("importer_group")),
)
return bol_lf
def clean_exporters(bol_lf, traders_lf):
"""
Cleans exporter names and assigns Trase IDs based on CNPJ and labels.
This function performs the following steps:
1. Cleans exporter names based on CNPJ numbers. The great majority of cases,
as in a previous run ingested new values - which can be found in
trase/runbook/brazil/trade/bol/2023/brazil_trade_bol_ingestions_2023.py
2. Cleans exporter names based on labels if no match is found using CNPJ.
3. Merges the cleaned data and normalizes the group field.
Parameters:
- bol_lf (LazyFrame): The input trade data containing exporter information.
- traders_lf (LazyFrame): The reference DataFrame containing trader information.
"""
traders_lf = traders_lf.select("labels", "name", "trase_id", "group_name")
# ------------------------------------------------------------------------
# a) Clean based on cnpj (BR-TRADER-XXXXXXXX)
# ------------------------------------------------------------------------
# take the distinct exporter labels and cnpjs
exporters_lf = bol_lf.select("exporter_label", "exporter_cnpj").unique()
# Add a field called 'trase-id-test' concatenating "BR-TRADER-" with the first 8 characters of the cnpj
exporters_lf = (
exporters_lf.with_columns(
trase_id_test=pl.concat_str(
pl.lit("BR-TRADER-"),
pl.col("exporter_cnpj").str.slice(0, 8),
)
)
.drop("exporter_cnpj")
.unique()
)
# Assign null to the fields with BR-TRADER-00000000
exporters_lf = exporters_lf.with_columns(
trase_id_test=pl.when(pl.col("trase_id_test") == pl.lit("BR-TRADER-00000000"))
.then(pl.lit(None))
.otherwise(pl.col("trase_id_test"))
)
# Join the exporters based on 'trase_id_test' field (based on cnpj)
exporters_lf = exporters_lf.join(
traders_lf,
how="left",
left_on="trase_id_test",
right_on="trase_id",
validate="m:1",
).drop("labels")
exporters_lf = exporters_lf.rename(
{
"name": "exporter_name",
"group_name": "exporter_group",
}
)
# Create a field 'exporter_trase_id' filled for the fields that had a match
exporters_lf = exporters_lf.with_columns(
exporter_trase_id=pl.when(pl.col("exporter_name").is_not_null())
.then(pl.col("trase_id_test"))
.otherwise(pl.lit(None))
)
exporters_with_cnpj_matches = exporters_lf.filter(
pl.col("exporter_name").is_not_null()
).unique()
# ------------------------------------------------------------------------
# b) Clean based on label
# ------------------------------------------------------------------------
# Take the exporter_labels of the records where there was no match
exporters_by_label_lf = (
exporters_lf.filter(pl.col("exporter_name").is_null())
.select("exporter_label", "trase_id_test")
.unique()
)
# Match by label
exporters_by_label_lf = add_trader_data_by_label(
exporters_by_label_lf,
traders_lf,
"exporter_label",
)
exporters_by_label_lf = exporters_by_label_lf.rename(
{
"trader_name": "exporter_name",
"trader_group_name": "exporter_group",
"trader_trase_id": "exporter_trase_id",
}
)
# At this point all traders should have a name - group match with the database
# The ones that didn't had it when first running this script, were manually added
# based on a review of this dataframe, and the changes done were kept in the
# following script:
# trase/runbook/brazil/trade/bol/2023/brazil_trade_bol_ingestions_2023.py
# ------------------------------------------------------------------------
# c) Bring together the two dataframes and normalize the group field
# ------------------------------------------------------------------------
# Add the 'trase-id-test' field to the bol_lf dataframe so to join it with
# the exporters_with_cnpj_matches
bol_lf = bol_lf.with_columns(
trase_id_test=pl.concat_str(
pl.lit("BR-TRADER-"),
pl.col("exporter_cnpj").str.slice(0, 8),
)
)
bol_lf = bol_lf.join(
exporters_with_cnpj_matches,
how="left",
left_on=["exporter_label", "trase_id_test"],
right_on=["exporter_label", "trase_id_test"],
validate="m:1",
)
# join also with exporters_by_label_lf, to use when the previous didn't match
bol_lf = bol_lf.join(
exporters_by_label_lf,
how="left",
on="exporter_label",
)
bol_lf = bol_lf.with_columns(
exporter_name=pl.when(pl.col("exporter_name").is_null())
.then(pl.col("exporter_name_right"))
.otherwise(pl.col("exporter_name")),
exporter_group=pl.when(pl.col("exporter_group").is_null())
.then(pl.col("exporter_group_right"))
.otherwise(pl.col("exporter_group")),
exporter_trase_id=pl.when(pl.col("exporter_trase_id").is_null())
.then(pl.col("exporter_trase_id_right"))
.otherwise(pl.col("exporter_trase_id")),
)
bol_lf = bol_lf.drop(
[
"trase_id_test",
"trase_id_test_right",
"exporter_name_right",
"exporter_trase_id_right",
"exporter_group_right",
"trader_trase_id",
]
)
return bol_lf
def clean_fob_and_net_weight(bol_lf, comtrade_lf):
"""
Imputes missing FOB values and adjust FOBs that have extremely high outliers,
relying on a z-score of the usd_per_kg and manual revision in a Notebook.
Currently not adjusting net_weight as the values seemed ok.
"""
# Add usd_per_kg_record in bol_lf
bol_lf = bol_lf.with_columns(
usd_per_kg_record=pl.when(
(pl.col("net_weight_kg").is_null()) | (pl.col("fob").is_null())
)
.then(pl.lit(None))
.otherwise(pl.col("fob") / pl.col("net_weight_kg"))
)
# Add modified z-score to rank outliers
median_and_mad_lf = bol_lf.group_by("hs6").agg(
[
pl.col("usd_per_kg_record").median().alias("median_usd_per_kilo"),
(
(pl.col("usd_per_kg_record") - pl.col("usd_per_kg_record").median())
.abs()
.median()
).alias("mad_usd_per_kilo"),
]
)
# Join and calculate z-score
bol_lf = bol_lf.join(median_and_mad_lf, on="hs6", how="left").with_columns(
(
0.6745
* (pl.col("usd_per_kg_record") - pl.col("median_usd_per_kilo"))
/ pl.col("mad_usd_per_kilo")
).alias("usd_per_kg_zscore")
)
# Take comtrade reference values in usd_per_kg_reference
comtrade_lf = comtrade_lf.select(
[
"year",
"country_of_export_iso",
"commodity_code",
"net_weight_kg",
"fob",
]
)
# Make a list of hs4 codes we have in the BOL
hs6_codes_lf = bol_lf.select("hs6").unique()
# Filter COMTRADE relevant values
comtrade_lf = comtrade_lf.join(
hs6_codes_lf, how="semi", left_on="commodity_code", right_on="hs6"
).filter(
(pl.col("year") == pl.lit(YEAR))
& (pl.col("country_of_export_iso") == pl.lit("BRA"))
& (pl.col("net_weight_kg") >= pl.lit(0))
& (pl.col("fob") >= pl.lit(0))
)
comtrade_lf = comtrade_lf.group_by("commodity_code").agg(
(pl.col("fob") / pl.col("net_weight_kg")).mean().alias("usd_per_kg_reference")
)
bol_lf = bol_lf.rename({"fob": "fob_original"})
# Bring usd_per_kg_reference to bol_lf
bol_lf = bol_lf.join(
comtrade_lf,
how="left",
left_on="hs6",
right_on="commodity_code",
)
# Impute missing FOB values based on COMTRADE
bol_lf = bol_lf.with_columns(
fob_adjustment_type=pl.when(
((pl.col("fob_original") <= 0) | pl.col("fob_original").is_null())
& (pl.col("net_weight_kg") > 0)
)
.then(pl.lit("imputed_from_comtrade"))
.otherwise(pl.lit(None)),
fob=pl.when(
((pl.col("fob_original") <= 0) | pl.col("fob_original").is_null())
& (pl.col("net_weight_kg") > 0)
)
.then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
.otherwise(pl.col("fob_original")),
)
# Impute missing hs6 120100 values based on mean (without outliers)
# as it doesn't exist in COMTRADE
hs_120100_usd_per_kg_reference = (
bol_lf.filter((pl.col("hs6") == "120100") & (pl.col("usd_per_kg_zscore") < 100))
.select(pl.col("usd_per_kg_record"))
.collect()
.to_series()
.mean()
)
bol_lf = bol_lf.with_columns(
fob_adjustment_type=pl.when(
(pl.col("hs6") == "120100") & (pl.col("fob_original").is_null())
)
.then(pl.lit("imputed_from_other_values"))
.otherwise(pl.col("fob_adjustment_type")),
fob=pl.when((pl.col("hs6") == "120100") & (pl.col("fob_original").is_null()))
.then(pl.col("net_weight_kg") * pl.lit(hs_120100_usd_per_kg_reference))
.otherwise(pl.col("fob")),
)
# Adjust cases where fob is completely off the charts, based on z-score and manual revision
# Notebook with outlier graphs in:
# trase/data_pipeline/models/brazil/trade/bol/2023/outlier_qa_brazil_bol_2023.ipynb
# hs 020230 outliers (2 cases)
bol_lf = bol_lf.with_columns(
fob_adjustment_type=pl.when(
(pl.col("hs6") == "020230")
& (pl.col("usd_per_kg_zscore") > 100)
& (pl.col("fob") > 2_000_000)
)
.then(pl.lit("fob_outlier_adjustment"))
.otherwise(pl.col("fob_adjustment_type")),
fob=pl.when(
(pl.col("hs6") == "020230")
& (pl.col("usd_per_kg_zscore") > 100)
& (pl.col("fob") > 2_000_000)
)
.then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
.otherwise(pl.col("fob")),
)
# hs 120100 outliers with records with fob > 700M USD (15 cases)
bol_lf = bol_lf.with_columns(
fob_adjustment_type=pl.when(
(pl.col("hs6") == "120100")
& (pl.col("fob") > 700_000_000)
& (pl.col("usd_per_kg_zscore") > 100)
)
.then(pl.lit("fob_outlier_adjustment"))
.otherwise(pl.col("fob_adjustment_type")),
fob=pl.when(
(pl.col("hs6") == "120100")
& (pl.col("fob") > 700_000_000)
& (pl.col("usd_per_kg_zscore") > 100)
)
.then(pl.col("net_weight_kg") * pl.lit(hs_120100_usd_per_kg_reference))
.otherwise(pl.col("fob")),
)
# hs 150710 outliers (3 cases)
bol_lf = bol_lf.with_columns(
fob_adjustment_type=pl.when(
(pl.col("hs6") == "150710") & (pl.col("usd_per_kg_zscore") > 100)
)
.then(pl.lit("fob_outlier_adjustment"))
.otherwise(pl.col("fob_adjustment_type")),
fob=pl.when((pl.col("hs6") == "150710") & (pl.col("usd_per_kg_zscore") > 100))
.then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
.otherwise(pl.col("fob")),
)
# hs 230400 outliers (1 case)
bol_lf = bol_lf.with_columns(
fob_adjustment_type=pl.when(
(pl.col("hs6") == "230400") & (pl.col("usd_per_kg_zscore") > 100)
)
.then(pl.lit("fob_outlier_adjustment"))
.otherwise(pl.col("fob_adjustment_type")),
fob=pl.when((pl.col("hs6") == "230400") & (pl.col("usd_per_kg_zscore") > 100))
.then(pl.col("net_weight_kg") * pl.col("usd_per_kg_reference"))
.otherwise(pl.col("fob")),
)
bol_lf = bol_lf.drop(["mad_usd_per_kilo", "median_usd_per_kilo"])
return bol_lf
def reorder_columns(bol_lf):
"""
Reordering to make it easier to see the columns we have done special
handling or have special significance. Using the fact that unhandled
columns are in uppercase, which might not be the case in other years
"""
columns = bol_lf.columns
lowercase_cols = sorted([col for col in columns if col[0].islower()])
other_cols = [col for col in columns if col not in lowercase_cols]
return bol_lf.select(lowercase_cols + other_cols)
def run_model(dbt, cursor):
# get sources and models
bol_lf = (
dbt.source("source_brazil", "original_brazil_soy_beef_bol_2023").pl().lazy()
)
countries_lf = dbt.ref("postgres_countries").pl().lazy()
regions_lf = dbt.ref("postgres_regions_without_geometry").pl().lazy()
ports_lf = dbt.ref("postgres_ports").pl().lazy()
existing_hs_codes_lf = dbt.ref("hs2017").pl().lazy()
commodities_lf = dbt.ref("postgres_commodities").pl().lazy()
traders_lf = dbt.ref("postgres_traders").pl().lazy()
comtrade_lf = dbt.ref("comtrade_exports_year_exporter_hs6").pl().lazy()
# Add 'group_name' field to traders_lf with the active group
traders_lf = identify_active_group_name(traders_lf, YEAR, "groups").drop("groups")
# run the cleaning
bol_lf = rename_and_normalize_columns(bol_lf)
bol_lf = filter_country_and_hscodes(bol_lf, existing_hs_codes_lf, commodities_lf)
bol_lf = clean_countries(bol_lf, countries_lf)
bol_lf = add_european_union_bloc(bol_lf, countries_lf)
bol_lf = clean_states_and_municipalities(bol_lf, regions_lf)
bol_lf = clean_ports(bol_lf, ports_lf)
bol_lf = clean_cnpjs(bol_lf)
bol_lf = clean_importers(bol_lf, traders_lf)
bol_lf = clean_exporters(bol_lf, traders_lf)
bol_lf = clean_fob_and_net_weight(bol_lf, comtrade_lf)
bol_lf = reorder_columns(bol_lf)
return bol_lf
def model(dbt, cursor):
import traceback
dbt.config(materialized="external")
# Encapsuling in try/except to print the full traceback in case of error
# When debugging, don't use Polars lazy data loading, as it will not show the traceback
try:
bol_lf = run_model(dbt, cursor)
except Exception as e:
# Print a full traceback
traceback.print_exc()
raise
return bol_lf