Skip to content

Tac Signatories Combined

s3://trase-storage/brazil/indicators/actors/zd_commitments/out/TAC_SIGNATORIES_COMBINED.csv

Dbt path: trase_production.main_brazil.tac_signatories_combined

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/indicators/actors/zd_commitments/out/_schema.yml

Model file link: trase/data_pipeline/models/brazil/indicators/actors/zd_commitments/out/tac_signatories_combined.py

Calls script: trase/data/brazil/indicators/actors/zd_commitments/out/tac_signatories_combined.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: actors, brazil, indicators, out, zd_commitments


tac_signatories_combined

Description

Terms of Conduct Adjustment (TAC)

The Terms of Conduct Adjustment (Termos de Ajustamento de Conduta - TAC) are signed socio-environmental commitments from the Federal Public Ministry (Ministério Público Federal).

TAC description

The Termo de Ajustamento de Conduta (TAC) in the Brazilian beef sector is a legal instrument, first developed in Pará in 2009 and then spreading to other states, designed to ensure companies—particularly slaughterhouses—comply with environmental and labor regulations, especially around deforestation and illegal land use in cattle supply chains.

In the beef sector, TACs emerged as a response to widespread illegal deforestation linked to cattle ranching in the Amazon. The goal was to pressure slaughterhouses to avoid buying cattle from farms with illegal activities (e.g., forced labor, deforestation) or non-compliant with other criterias (e.g., land tenure and environmental regularization). Companies usually had a time limit to present a compliance plan, which was approved by Brazilian Federal Prosecuter's Office. Nowadays, the Boi na Linha protocol establishes a common ground for legal and socio-environmental standards.

Are TACs signed voluntarily?

Yes, TACs are formally voluntary, but in practice there is strong institutional pressure to sign. * Companies can refuse to sign, but may face legal action, reputational damage, or restrictions on access to public financing and major buyers. * Many large buyers (e.g., supermarkets, meat exporters) require suppliers to have signed and comply with a TAC.

So while not mandatory per se, signing is often strategically necessary.

Who signs the TAC da Carne?

A TAC is signed by a company, and the agreement affects the entire company. However, major slaughter companies, such as JBS or Marfrig, which operate in multiple states, can sign agreements for establishments in a specific state or group of states within the Legal Amazon. The TAC documents for companies with multiple establishments normally specify a tax number of a central office and not a specific slaughterhouse (or do not specify the tax number at all). Still, for companies with one (or a few) operational establishments, the tax number can be included, and it is typically associated with the slaughterhouse (CNPJ with 14 digits).

It implies that even if a company decides to open new slaughterhouses or modify existing CNPJs, all active establishments within a determined state will still have to be engaged. See below one example.

Company CNPJ on TAC? TAC Scope Geographic Limitation Example & Notes
JBS Central CNPJ (often without 14 digits) All establishments in Legal Amazon except Pará State-based (AC, MT, then extended) JBS TAC – See Preâmbulo 13 & Cláusula 6.7
Frigol Central office CNPJ (outside Legal Amazon) Only establishments in Pará Yes Frigol TAC – Cláusula 1.1
Marfrig Central office CNPJ (outside Legal Amazon) Only establishments in Mato Grosso Yes Similar to Frigol
Frinorte (Tomé Açu) Local slaughterhouse CNPJ (14 digits) Single establishment No Local-only agreement, CNPJ identifies facility
  • Company-Level Responsibility:

    • The TAC typically states that it is signed by the company, not just a facility. However, it limits its applicability to establishments in a specific state or region, meaning only those suppliers are affected.
  • CNPJ Usage:

    • Large companies usually do not specify all slaughterhouse CNPJs.
    • Smaller companies often do specify a single 14-digit CNPJ tied to one slaughterhouse.
    • In large companies, the headquarters CNPJ may be listed, but that doesn't mean all national operations are included.
  • Expansion of Scope Over Time:

    • Companies like JBS have had multiple TACs or amendments to expand to other states, indicating that each state may require a separate agreement or addendum.

Do TACs have limited 2-year validity?

Not necessarily. Most TACs in the cattle sector do not have a fixed expiration date. Many TACs explicitly state that they are indefinite: "O presente TERMO tem prazo indeterminado". This means the agreement remains valid as long as the company continues operating and has not been released from its obligations.

  • The 2-year idea may come from:
    • Misinterpretation of monitoring or reporting intervals (e.g., "apresentar relatório a cada 2 anos").
    • Older or generic TAC models in other sectors (e.g., environmental remediation).

So in the cattle sector, TACs are typically valid for an indefinite period — especially for large slaughterhouses under ongoing monitoring. They remain binding unless formally rescinded or replaced.

TAC Monitoring Obligations

Companies that sign TACs must: * Check their suppliers against lists of embargoed farms (from IBAMA, state agencies). * Ensure that no cattle are purchased from farms involved in: * Illegal deforestation * Slave-like labor * Overlapping Indigenous lands or conservation units * Provide animal movement records (GTAs) and other documentation.

Other details may be found at TAC Boi na Linha's website


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.beef_exporter_subsidiaries
  • model.trase_duckdb.tac_zdc_2023
  • model.trase_duckdb.tac_signatories
  • model.trase_duckdb.beef_cnpj_2010_new
  • model.trase_duckdb.beef_cnpj_2011_new
  • model.trase_duckdb.beef_cnpj_2012_new
  • model.trase_duckdb.beef_cnpj_2013_new
  • model.trase_duckdb.beef_cnpj_2014_new
  • model.trase_duckdb.beef_cnpj_2015_new
  • model.trase_duckdb.beef_cnpj_2016_new
  • model.trase_duckdb.beef_cnpj_2017_new
  • model.trase_duckdb.beef_auxiliary_cnpj_beef_cnpj_2018_new
  • model.trase_duckdb.beef_cnpj_2019_new
  • model.trase_duckdb.beef_cnpj_2020_new

Sources

  • ['trase-storage-raw', 'beef_exporter_subsidiaries']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf

from trase.tools import sps
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload

"""

Config Session

"""

OUTPUT_FILE = "brazil/indicators/actors/zd_commitments/out/TAC_SIGNATORIES_COMBINED.csv"

FILEPATH_TAC_SIGNATORIES_IMAFLORA = "brazil/beef/indicators/silver/tac_zdc_2023.parquet"

FILEPATH_TAC_SIGNATORIES_GIBBSLAB = (
    "brazil/indicators/actors/zd_commitments/out/TAC_SIGNATORIES.csv"
)

FILEPATH_RELATIONSHIPS = (
    "brazil/logistics/company_relationships/BEEF_EXPORTER_SUBSIDIARIES.csv"
)


LEGAL_AMAZON_UF = ["AC", "AM", "AP", "MA", "MT", "PA", "RO", "RR", "TO"]

TARGET_COLUMNS = {
    "relationship": {
        "PARENT_CNPJ8": "parent_cnpj8",
        "SUBSID_CNPJ8": "subsid_cnpj8",
        "START_YEAR": "start_year",
        "END_YEAR": "end_year",
    },
    "old_tac_df": {
        "SH_CNPJ": "cnpj",
        "YEAR_TAC_SIGNED": "signing_year",
        "TAC_STATE": "state",
        "APPLIES_TO_LEGAL_AMAZON": "legal_amazon",
    },
    "new_tac_df": {"signing_year": "signing_year", "state_uf": "state", "cnpj": "cnpj"},
}

"""

Auxiliar Functions

"""


def validate_cnpj_code(code):
    # first deduce what the type is given the length of string
    # that it appeared in the original data
    if code == "NA":
        code = "0"

    cnpj = code.rjust(14, "0")
    is_valid_cnpj = stdnum.br.cnpj.is_valid(cnpj)

    cpf = code.rjust(11, "0")
    is_valid_cpf = stdnum.br.cpf.is_valid(cpf)

    # we use the checksum method if it was unequivocal and fall back on the
    # string-length method otherwise
    if is_valid_cnpj and not is_valid_cpf:
        return "valid", cnpj
    elif is_valid_cpf and not is_valid_cnpj:
        return "valid", cpf
    else:
        return "invalid", cnpj if len(code) > 11 else "invalid", cpf


def read_relationships():
    # TODO: we should start using 2020-12-19-BEEF_EXPORTER_SUBSIDIARIES.csv
    # However we have currently paused work on ZDCs, so I'm kicking this can down the
    # road...
    return get_pandas_df_once(
        FILEPATH_RELATIONSHIPS,
        sep=";",
        dtype=str,
        keep_default_na=False,
    )


def process_relationships(df_relationships):
    df_relationships = df_relationships.rename(
        columns=TARGET_COLUMNS["relationship"], errors="raise"
    )

    df_relationships.loc[
        df_relationships["PARENT_CLEAN"] == "MARFRIG GLOBAL FOODS", "parent_cnpj8"
    ] = "03853896"

    df_relationships.loc[
        df_relationships["PARENT_CLEAN"]
        == "J.N.J. COMERCIAL IMPORTADORA E EXPORTADORA DE CARNES",
        "parent_cnpj8",
    ] = "07664941"

    assert all(df_relationships["parent_cnpj8"].str.len() == 8)
    assert all(df_relationships["subsid_cnpj8"].str.len() == 8)

    df_relationships = df_relationships[
        list(TARGET_COLUMNS["relationship"].values())
    ].drop_duplicates()

    return df_relationships


def read_cnpj(year):
    # TODO: use Athena to read the latest RFB data rather than these pre-filtered CSVs
    return get_pandas_df_once(
        f"brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{year}_NEW.csv",
        sep=";",
        dtype=str,
        keep_default_na=False,
    )


def process_and_concatenate_cnpjs(df_cnpjs):
    df = sps.concat(df_cnpjs)

    df = df.drop_duplicates()

    df["is_valid"], df["cnpj"] = zip(*df["cnpj"].apply(validate_cnpj_code))
    df["is_valid"] = df["is_valid"] == "valid"
    print(
        f'Dropped {sum(~df["is_valid"])} CNPJ(s) from the database that are not valid'
    )
    df = df[df.pop("is_valid")]

    return df


def clean_tacs(df_tac_imaflora, df_tac_gibbs, df_cnpj, df_relationships):

    df_tac_gibbs = df_tac_gibbs[TARGET_COLUMNS["old_tac_df"].keys()].rename(
        columns=TARGET_COLUMNS["old_tac_df"], errors="raise"
    )
    df_tac_imaflora = df_tac_imaflora[TARGET_COLUMNS["new_tac_df"].keys()].rename(
        columns=TARGET_COLUMNS["new_tac_df"], errors="raise"
    )

    # update legal amazon field
    df_tac_imaflora["legal_amazon"] = np.where(
        df_tac_imaflora["state"].isin(LEGAL_AMAZON_UF), True, False
    )
    df_tac_gibbs["legal_amazon"] = np.where(
        df_tac_gibbs["state"].isin(LEGAL_AMAZON_UF), True, False
    )

    # how to handle nan and inf records?
    # replace by the most frequent year
    df_tac_imaflora["signing_year"] = (
        df_tac_imaflora["signing_year"].replace([np.inf, -np.inf], 2013).fillna(2013)
    )
    df_tac_gibbs["signing_year"] = (
        df_tac_gibbs["signing_year"].fillna(0).replace("NA", 2013)
    )

    # convert year to int
    df_tac_imaflora["signing_year"] = df_tac_imaflora["signing_year"].astype(int)
    df_tac_gibbs["signing_year"] = df_tac_gibbs["signing_year"].astype(int)

    # The supply shed dataset applies to domestic, so remove any cnpj8 that are not in our CNPJ database
    # It is applicable only to holly gibbs dataset
    # -------------------------------------------------------------
    before = len(df_tac_gibbs)
    df_tac_gibbs = pd.merge(
        df_tac_gibbs, df_cnpj["cnpj"].drop_duplicates(), validate="many_to_one"
    )
    print(f"Drop {len(df_tac_gibbs) - before} rows with unrecognised CNPJs")

    # set cnpj8
    df_tac_gibbs["cnpj8"] = df_tac_gibbs["cnpj"].str.slice(0, 8)
    df_tac_imaflora["cnpj8"] = df_tac_imaflora["cnpj"].str.slice(0, 8)

    # check cnpj8
    assert all(
        (df_tac_gibbs["cnpj8"].str.len() == 8) & df_tac_gibbs["cnpj8"].str.isdigit()
    )
    assert all(
        (df_tac_imaflora["cnpj8"].str.len() == 8)
        & df_tac_imaflora["cnpj8"].str.isdigit()
    )

    # filter by its relationships (subsidiaries)
    df_tac_gibbs = pd.merge(
        df_tac_gibbs,
        df_relationships,
        left_on="cnpj8",
        right_on="subsid_cnpj8",
        how="left",
        validate="many_to_one",
    )

    # set parent_cnpj8
    df_tac_gibbs["parent_cnpj8"] = df_tac_gibbs["parent_cnpj8"].mask(
        df_tac_gibbs["parent_cnpj8"].isna(), df_tac_gibbs["cnpj8"]
    )
    df_tac_imaflora["parent_cnpj8"] = np.where(
        df_tac_imaflora["cnpj8"]
        != "04748631",  # I found that the only exception was CNPJ8, which has a different parent
        df_tac_imaflora["cnpj8"],
        "03853896",  # this is the parent cnpj8 for the exception
    )

    # set source field
    df_tac_gibbs["source"] = "Gibbs Lab"
    df_tac_imaflora["source"] = "Boi na Linha"

    return df_tac_gibbs, df_tac_imaflora


def process_data(df_tac_imaflora, df_tac_gibbs, df_cnpjs, df_relationships):

    df_relationships = process_relationships(df_relationships)
    df_cnpj = process_and_concatenate_cnpjs(df_cnpjs)

    df_tac_gibbs, df_tac_imaflora = clean_tacs(
        df_tac_imaflora, df_tac_gibbs, df_cnpj, df_relationships
    )

    # filter target fields
    df_tac_gibbs = df_tac_gibbs[
        [
            "cnpj",
            "cnpj8",
            "parent_cnpj8",
            "signing_year",
            "state",
            "legal_amazon",
            "source",
        ]
    ]
    df_tac_imaflora = df_tac_imaflora[
        [
            "cnpj",
            "cnpj8",
            "parent_cnpj8",
            "signing_year",
            "state",
            "legal_amazon",
            "source",
        ]
    ]

    # merge datasets
    df = pd.concat([df_tac_gibbs, df_tac_imaflora])

    # group by CNPJ and combine source info
    df["source"] = df.groupby("cnpj")["source"].transform(
        lambda x: "_".join(sorted(set(x)))
    )

    # remove duplicates
    df = df.sort_values("signing_year").drop_duplicates(subset="cnpj", keep="first")

    # filter by target years
    YEARS = list(range(2009, 2024))
    df = df.query("signing_year in @YEARS")
    df = df.drop_duplicates()

    # convert types
    df["signing_year"] = df["signing_year"].astype(int)

    return df.reset_index(drop=True)


def main():
    cnpj_years = [2010, 2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018, 2019, 2020]
    df_cnpjs = [read_cnpj(year) for year in cnpj_years]
    df_relationships = read_relationships()

    # load tac dataset provided by holly gibbs lab
    df_tac_gibbs = get_pandas_df_once(
        FILEPATH_TAC_SIGNATORIES_GIBBSLAB, sep=";", dtype=str, keep_default_na=False
    )

    # load tac dataset provided by imaflora
    df_tac_imaflora = read_s3_parquet(FILEPATH_TAC_SIGNATORIES_IMAFLORA)

    # clean and combine datasets
    df = process_data(df_tac_imaflora, df_tac_gibbs, df_cnpjs, df_relationships)

    write_csv_for_upload(df=df, key=OUTPUT_FILE, index=False)


if __name__ == "__main__":
    main()
from trase.data.brazil.indicators.actors.zd_commitments.out.tac_signatories_combined import (
    process_data,
)


def model(dbt, cursor):
    dbt.config(materialized="external")

    df_tac_imaflora = dbt.ref("tac_zdc_2023").df()
    df_tac_gibbs = dbt.ref("tac_signatories").df()
    df_relationships = dbt.source(
        "trase-storage-raw", "beef_exporter_subsidiaries"
    ).df()
    df_cnpjs = [
        dbt.ref("beef_cnpj_2010_new").df(),
        dbt.ref("beef_cnpj_2011_new").df(),
        dbt.ref("beef_cnpj_2012_new").df(),
        dbt.ref("beef_cnpj_2013_new").df(),
        dbt.ref("beef_cnpj_2014_new").df(),
        dbt.ref("beef_cnpj_2015_new").df(),
        dbt.ref("beef_cnpj_2016_new").df(),
        dbt.ref("beef_cnpj_2017_new").df(),
        dbt.ref("beef_auxiliary_cnpj_beef_cnpj_2018_new").df(),
        dbt.ref("beef_cnpj_2019_new").df(),
        dbt.ref("beef_cnpj_2020_new").df(),
    ]

    df = process_data(df_tac_imaflora, df_tac_gibbs, df_cnpjs, df_relationships)

    raise NotImplementedError(
        "This model has been written but what is on S3 was actually generated by running the Python script directly, not by dbt"
    )

    return df