Skip to content

Brazil Beef Exporters Enriched

s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/brazil_beef_exporters_enriched.parquet

Dbt path: trase_production.main_brazil.brazil_beef_exporters_enriched

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/_schema_sei_pcs_v2_2_1.yml

Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/brazil_beef_exporters_enriched.py

Calls script: trase/data/brazil/beef/sei_pcs/v2_2_1/brazil_beef_exporters_enriched.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: beef, brazil, sei_pcs


brazil_beef_exporters_enriched

Description

Standardised and enriched exporter information for Brazil beef SEI-PCS data, linked to exporter nodes in the Trase database. The dataset serves two goals: (1) provide clean, consistent exporter identifiers and names for public release, and (2) attach zero-deforestation commitments (ZDCs) at the exporter-group level. Intended join with the Brazil beef SEI-PCS results is on the columns EXPORTER_CNPJ, YEAR, and EXPORTER. The join should be full: there must be a unique match in this dataset for every SEI-PCS row across all years. If not, this dataset should be re-generated. Matching to Trase occurs in two ways: 1. via a Trase ID derived from the (cleaned) CNPJ; or 2. otherwise, via the original (uncleaned) exporter name used as a “trader label”.

A data test is included to reflect the corporate renaming of Mataboi to Prima Foods in 2020.


Details

Column Type Description
YEAR INTEGER
EXPORTER_CNPJ VARCHAR Brazilian tax identifier (CNPJ or CPF) as it appears in the SEI-PCS data
EXPORTER VARCHAR Exporter name as it appears in the SEI-PCS data (i.e. unclean label)
VOLUME_RAW FLOAT Total raw carcass weight equivalent volume of beef exported by the given exporter in the given year
CNPJ_FORMATTED VARCHAR Brazilian tax identifier (CNPJ or CPF) formatted where possible (e.g., 12.345.678/0001-90)
CNPJ_VALIDATION VARCHAR One of "valid_cnpj", "valid_cpf", or "invalid".
EXPORTER_TRASE_ID VARCHAR Trase ID for the trader node, e.g. BR-TRADER-02916265
EXPORTER_NODE_ID BIGINT Node ID for the exporter in the Trase database
EXPORTER_LOOKUP_METHOD VARCHAR Either "from trase id" or "from label"
EXPORTER_CLEAN_NAME VARCHAR Standard exporter name as it appears in the Trase database
EXPORTER_GROUP_ID BIGINT Node ID of the exporter group in the Trase database
EXPORTER_GROUP_NAME VARCHAR Standard exporter group name as it appears in the Trase database

Models / Seeds

  • model.trase_duckdb.brazil_beef_exporters
import pandas as pd
import polars as pl
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql

from trase.tools import get_country_id, find_node_by_trase_id
from trase.tools.aws.metadata import write_parquet_for_upload
from trase.tools.pandasdb.find import (
    find_default_name_by_node_id,
    find_nodes_by_trase_id,
    find_trader_groups_by_trader_id,
    find_traders_by_label,
)
from trase.tools.pandasdb.query import query_with_dataframe


def validate_and_format_cnpjs(df):

    # Column expressions for padded values
    cnpj = pl.col("EXPORTER_CNPJ").cast(pl.Utf8).str.pad_start(14, "0")
    cpf = pl.col("EXPORTER_CNPJ").cast(pl.Utf8).str.pad_start(11, "0")

    # Boolean expressions for validity
    is_valid_cnpj = cnpj.map_elements(
        stdnum.br.cnpj.is_valid, return_dtype=pl.Boolean()
    )
    is_valid_cpf = cpf.map_elements(stdnum.br.cpf.is_valid, return_dtype=pl.Boolean())

    # formatted CNPJ/CPF
    formatted_cnpj = (
        pl.when(is_valid_cnpj)
        .then(cnpj.map_elements(stdnum.br.cnpj.format, return_dtype=pl.Utf8()))
        .when(is_valid_cpf)
        .then(cpf.map_elements(stdnum.br.cpf.format, return_dtype=pl.Utf8()))
        .otherwise(pl.lit(""))
    )

    # validation status
    validation = (
        pl.when(is_valid_cnpj)
        .then(pl.lit("valid_cnpj"))
        .when(is_valid_cpf)
        .then(pl.lit("valid_cpf"))
        .otherwise(pl.lit("invalid"))
    )

    # trase_id
    cnpj8 = (pl.when(is_valid_cpf).then(cpf).otherwise(cnpj)).str.slice(0, 8)
    trase_id = "BR-TRADER-" + cnpj8

    # a couple of known invalid CNPJs
    is_invalid_cnpj = pl.col("EXPORTER_CNPJ").is_in(["00000000000000", "INVALID"])
    trase_id = (
        pl.when(is_invalid_cnpj).then(pl.lit("BR-TRADER-XXXXXXXX")).otherwise(trase_id)
    )

    # add columns to dataframe
    return df.with_columns(
        formatted_cnpj.alias("CNPJ_FORMATTED"),
        validation.alias("CNPJ_VALIDATION"),
        trase_id.alias("EXPORTER_TRASE_ID"),
    )


def add_trader_ids(df: pl.DataFrame) -> pl.DataFrame:
    # Lookup by Trase ID.
    # Special case: if the Trase ID is the placeholder "BR-TRADER-XXXXXXXX", we skip the
    # Trase lookup and leave it as null. This ensures that in the next step we can fall
    # back to the label-based lookup — some rows have an unknown placeholder Trase ID
    # but still have a valid label we can use
    df_ = df.to_pandas()
    is_unknown_trase_id: pd.Series = df_["EXPORTER_TRASE_ID"] == "BR-TRADER-XXXXXXXX"
    df_.loc[is_unknown_trase_id, "EXPORTER_TRASE_ID"] = None
    (trase_id_values,) = zip(
        *find_nodes_by_trase_id(
            df_,
            returning=["node_id"],
            trase_id=sql.Identifier("EXPORTER_TRASE_ID"),
            year=sql.Identifier("YEAR"),
            on_extra_columns="ignore",
        )
    )
    trase_id_series = pl.Series(trase_id_values, dtype=pl.Int64())

    # lookup by label
    # we have one annoying case of "MATABOI ALIMENTOS LTDA" appearing in the data
    # without a Trase ID in 2020. This will match the trader "MATABOI ALIMENTOS" even
    # though it should be PRIMA FOODS. Unfortunately I don't think it's possible to have
    # time-dependent label > trader relationships. So instead I'll just hard-code it
    # here as an exception
    df_ = df.to_pandas()
    df_.loc[
        (df_["EXPORTER"] == "MATABOI ALIMENTOS LTDA") & (df_["YEAR"] >= 2020),
        "EXPORTER",
    ] = "PRIMA FOODS SA"
    brazil_id = get_country_id("BRAZIL")
    (label_id_values,) = zip(
        *find_traders_by_label(
            df_,
            returning=["trader_id"],
            trader_label=sql.Identifier("EXPORTER"),
            country_id=sql.Literal(brazil_id),
            on_extra_columns="ignore",
        )
    )
    label_id_series = pl.Series(label_id_values, dtype=pl.Int64())

    # unknown trase id
    unknown_trader_id = find_node_by_trase_id("BR-TRADER-XXXXXXXX")
    unknown_id_values = is_unknown_trase_id.map({True: unknown_trader_id, False: None})
    unknown_id_series = pl.Series(unknown_id_values, dtype=pl.Int64())

    # Check that each row has at least one ID
    assert (
        trase_id_series.is_not_null()
        | label_id_series.is_not_null()
        | unknown_id_series.is_not_null()
    ).all(), "Cannot find any node IDs for some exporters"

    # Prefer Trase ID, otherwise use label
    node_id_series = pl.coalesce(
        [trase_id_series, label_id_series, unknown_id_series]
    ).alias("EXPORTER_NODE_ID")
    lookup_method_expression = (
        pl.when(trase_id_series.is_not_null())
        .then(pl.lit("from trase id"))
        .when(label_id_series.is_not_null())
        .then(pl.lit("from label"))
        .when(unknown_id_series.is_not_null())
        .then(pl.lit("from trase id"))
        .alias("EXPORTER_LOOKUP_METHOD")
    )

    # check that we have level one nodes (i.e. TRADER, not TRADER LABEL)
    df = df.with_columns(node_id_series, lookup_method_expression)
    levels = query_with_dataframe(
        df.select("EXPORTER_NODE_ID").unique().to_pandas(),
        "select distinct level from df join nodes on df.EXPORTER_NODE_ID = nodes.id",
    )
    assert all(levels == 1)

    return df


def add_clean_trader_names(df):
    (names,) = zip(
        *find_default_name_by_node_id(
            df.to_pandas(),
            returning=["name"],
            node_id=sql.Identifier("EXPORTER_NODE_ID"),
            on_extra_columns="ignore",
        )
    )
    return df.with_columns(pl.Series("EXPORTER_CLEAN_NAME", names, dtype=pl.Utf8))


def add_trader_groups(df):
    group_ids, group_names = zip(
        *find_trader_groups_by_trader_id(
            df.to_pandas(),
            returning=[
                "group_id",
                "group_name",
            ],
            trader_id=sql.Identifier("EXPORTER_NODE_ID"),
            year=sql.Identifier("YEAR"),
            on_extra_columns="ignore",
        )
    )
    return df.with_columns(
        pl.Series("EXPORTER_GROUP_ID", group_ids),
        pl.Series("EXPORTER_GROUP_NAME", group_names),
    )


def process(df):
    df = validate_and_format_cnpjs(df)
    df = add_trader_ids(df)
    df = add_clean_trader_names(df)
    df = add_trader_groups(df)

    assert 0 == df.null_count().pipe(sum).item()

    return df


if __name__ == "__main__":
    df = pl.read_parquet(
        "s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/brazil_beef_exporters.parquet"
    )
    df = process(df)
    write_parquet_for_upload(
        df,
        "brazil/beef/sei_pcs/v2.2.1/brazil_beef_exporters_enriched.parquet",
        is_polars=True,
    )
from trase.data.brazil.beef.sei_pcs.v2_2_1.brazil_beef_exporters_enriched import process


def model(dbt, cursor):
    dbt.config(materialized="external")

    df = dbt.ref("brazil_beef_exporters").pl()
    return process(df)