Brazil Beef Exporters Enriched
s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/brazil_beef_exporters_enriched.parquet
Dbt path: trase_production.main_brazil.brazil_beef_exporters_enriched
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/_schema_sei_pcs_v2_2_1.yml
Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/brazil_beef_exporters_enriched.py
Calls script: trase/data/brazil/beef/sei_pcs/v2_2_1/brazil_beef_exporters_enriched.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: beef, brazil, sei_pcs
brazil_beef_exporters_enriched
Description
Standardised and enriched exporter information for Brazil beef SEI-PCS data, linked to exporter nodes in the Trase database. The dataset serves two goals: (1) provide clean, consistent exporter identifiers and names for public release, and (2) attach zero-deforestation commitments (ZDCs) at the exporter-group level. Intended join with the Brazil beef SEI-PCS results is on the columns EXPORTER_CNPJ, YEAR, and EXPORTER. The join should be full: there must be a unique match in this dataset for every SEI-PCS row across all years. If not, this dataset should be re-generated. Matching to Trase occurs in two ways: 1. via a Trase ID derived from the (cleaned) CNPJ; or 2. otherwise, via the original (uncleaned) exporter name used as a “trader label”.
A data test is included to reflect the corporate renaming of Mataboi to Prima Foods in 2020.
Details
| Column | Type | Description |
|---|---|---|
YEAR |
INTEGER |
|
EXPORTER_CNPJ |
VARCHAR |
Brazilian tax identifier (CNPJ or CPF) as it appears in the SEI-PCS data |
EXPORTER |
VARCHAR |
Exporter name as it appears in the SEI-PCS data (i.e. unclean label) |
VOLUME_RAW |
FLOAT |
Total raw carcass weight equivalent volume of beef exported by the given exporter in the given year |
CNPJ_FORMATTED |
VARCHAR |
Brazilian tax identifier (CNPJ or CPF) formatted where possible (e.g., 12.345.678/0001-90) |
CNPJ_VALIDATION |
VARCHAR |
One of "valid_cnpj", "valid_cpf", or "invalid". |
EXPORTER_TRASE_ID |
VARCHAR |
Trase ID for the trader node, e.g. BR-TRADER-02916265 |
EXPORTER_NODE_ID |
BIGINT |
Node ID for the exporter in the Trase database |
EXPORTER_LOOKUP_METHOD |
VARCHAR |
Either "from trase id" or "from label" |
EXPORTER_CLEAN_NAME |
VARCHAR |
Standard exporter name as it appears in the Trase database |
EXPORTER_GROUP_ID |
BIGINT |
Node ID of the exporter group in the Trase database |
EXPORTER_GROUP_NAME |
VARCHAR |
Standard exporter group name as it appears in the Trase database |
Models / Seeds
model.trase_duckdb.brazil_beef_exporters
import pandas as pd
import polars as pl
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql
from trase.tools import get_country_id, find_node_by_trase_id
from trase.tools.aws.metadata import write_parquet_for_upload
from trase.tools.pandasdb.find import (
find_default_name_by_node_id,
find_nodes_by_trase_id,
find_trader_groups_by_trader_id,
find_traders_by_label,
)
from trase.tools.pandasdb.query import query_with_dataframe
def validate_and_format_cnpjs(df):
# Column expressions for padded values
cnpj = pl.col("EXPORTER_CNPJ").cast(pl.Utf8).str.pad_start(14, "0")
cpf = pl.col("EXPORTER_CNPJ").cast(pl.Utf8).str.pad_start(11, "0")
# Boolean expressions for validity
is_valid_cnpj = cnpj.map_elements(
stdnum.br.cnpj.is_valid, return_dtype=pl.Boolean()
)
is_valid_cpf = cpf.map_elements(stdnum.br.cpf.is_valid, return_dtype=pl.Boolean())
# formatted CNPJ/CPF
formatted_cnpj = (
pl.when(is_valid_cnpj)
.then(cnpj.map_elements(stdnum.br.cnpj.format, return_dtype=pl.Utf8()))
.when(is_valid_cpf)
.then(cpf.map_elements(stdnum.br.cpf.format, return_dtype=pl.Utf8()))
.otherwise(pl.lit(""))
)
# validation status
validation = (
pl.when(is_valid_cnpj)
.then(pl.lit("valid_cnpj"))
.when(is_valid_cpf)
.then(pl.lit("valid_cpf"))
.otherwise(pl.lit("invalid"))
)
# trase_id
cnpj8 = (pl.when(is_valid_cpf).then(cpf).otherwise(cnpj)).str.slice(0, 8)
trase_id = "BR-TRADER-" + cnpj8
# a couple of known invalid CNPJs
is_invalid_cnpj = pl.col("EXPORTER_CNPJ").is_in(["00000000000000", "INVALID"])
trase_id = (
pl.when(is_invalid_cnpj).then(pl.lit("BR-TRADER-XXXXXXXX")).otherwise(trase_id)
)
# add columns to dataframe
return df.with_columns(
formatted_cnpj.alias("CNPJ_FORMATTED"),
validation.alias("CNPJ_VALIDATION"),
trase_id.alias("EXPORTER_TRASE_ID"),
)
def add_trader_ids(df: pl.DataFrame) -> pl.DataFrame:
# Lookup by Trase ID.
# Special case: if the Trase ID is the placeholder "BR-TRADER-XXXXXXXX", we skip the
# Trase lookup and leave it as null. This ensures that in the next step we can fall
# back to the label-based lookup — some rows have an unknown placeholder Trase ID
# but still have a valid label we can use
df_ = df.to_pandas()
is_unknown_trase_id: pd.Series = df_["EXPORTER_TRASE_ID"] == "BR-TRADER-XXXXXXXX"
df_.loc[is_unknown_trase_id, "EXPORTER_TRASE_ID"] = None
(trase_id_values,) = zip(
*find_nodes_by_trase_id(
df_,
returning=["node_id"],
trase_id=sql.Identifier("EXPORTER_TRASE_ID"),
year=sql.Identifier("YEAR"),
on_extra_columns="ignore",
)
)
trase_id_series = pl.Series(trase_id_values, dtype=pl.Int64())
# lookup by label
# we have one annoying case of "MATABOI ALIMENTOS LTDA" appearing in the data
# without a Trase ID in 2020. This will match the trader "MATABOI ALIMENTOS" even
# though it should be PRIMA FOODS. Unfortunately I don't think it's possible to have
# time-dependent label > trader relationships. So instead I'll just hard-code it
# here as an exception
df_ = df.to_pandas()
df_.loc[
(df_["EXPORTER"] == "MATABOI ALIMENTOS LTDA") & (df_["YEAR"] >= 2020),
"EXPORTER",
] = "PRIMA FOODS SA"
brazil_id = get_country_id("BRAZIL")
(label_id_values,) = zip(
*find_traders_by_label(
df_,
returning=["trader_id"],
trader_label=sql.Identifier("EXPORTER"),
country_id=sql.Literal(brazil_id),
on_extra_columns="ignore",
)
)
label_id_series = pl.Series(label_id_values, dtype=pl.Int64())
# unknown trase id
unknown_trader_id = find_node_by_trase_id("BR-TRADER-XXXXXXXX")
unknown_id_values = is_unknown_trase_id.map({True: unknown_trader_id, False: None})
unknown_id_series = pl.Series(unknown_id_values, dtype=pl.Int64())
# Check that each row has at least one ID
assert (
trase_id_series.is_not_null()
| label_id_series.is_not_null()
| unknown_id_series.is_not_null()
).all(), "Cannot find any node IDs for some exporters"
# Prefer Trase ID, otherwise use label
node_id_series = pl.coalesce(
[trase_id_series, label_id_series, unknown_id_series]
).alias("EXPORTER_NODE_ID")
lookup_method_expression = (
pl.when(trase_id_series.is_not_null())
.then(pl.lit("from trase id"))
.when(label_id_series.is_not_null())
.then(pl.lit("from label"))
.when(unknown_id_series.is_not_null())
.then(pl.lit("from trase id"))
.alias("EXPORTER_LOOKUP_METHOD")
)
# check that we have level one nodes (i.e. TRADER, not TRADER LABEL)
df = df.with_columns(node_id_series, lookup_method_expression)
levels = query_with_dataframe(
df.select("EXPORTER_NODE_ID").unique().to_pandas(),
"select distinct level from df join nodes on df.EXPORTER_NODE_ID = nodes.id",
)
assert all(levels == 1)
return df
def add_clean_trader_names(df):
(names,) = zip(
*find_default_name_by_node_id(
df.to_pandas(),
returning=["name"],
node_id=sql.Identifier("EXPORTER_NODE_ID"),
on_extra_columns="ignore",
)
)
return df.with_columns(pl.Series("EXPORTER_CLEAN_NAME", names, dtype=pl.Utf8))
def add_trader_groups(df):
group_ids, group_names = zip(
*find_trader_groups_by_trader_id(
df.to_pandas(),
returning=[
"group_id",
"group_name",
],
trader_id=sql.Identifier("EXPORTER_NODE_ID"),
year=sql.Identifier("YEAR"),
on_extra_columns="ignore",
)
)
return df.with_columns(
pl.Series("EXPORTER_GROUP_ID", group_ids),
pl.Series("EXPORTER_GROUP_NAME", group_names),
)
def process(df):
df = validate_and_format_cnpjs(df)
df = add_trader_ids(df)
df = add_clean_trader_names(df)
df = add_trader_groups(df)
assert 0 == df.null_count().pipe(sum).item()
return df
if __name__ == "__main__":
df = pl.read_parquet(
"s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/brazil_beef_exporters.parquet"
)
df = process(df)
write_parquet_for_upload(
df,
"brazil/beef/sei_pcs/v2.2.1/brazil_beef_exporters_enriched.parquet",
is_polars=True,
)
from trase.data.brazil.beef.sei_pcs.v2_2_1.brazil_beef_exporters_enriched import process
def model(dbt, cursor):
dbt.config(materialized="external")
df = dbt.ref("brazil_beef_exporters").pl()
return process(df)