Skip to content

Beef Auxiliary Cnpj 2022 New

s3://trase-storage/brazil/beef/auxiliary/cnpj/BEEF_CNPJ_2022_NEW.parquet

Dbt path: trase_production.main_brazil.beef_auxiliary_cnpj_2022_new

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/auxiliary/cnpj/_schema_auxiliary_cnpj.yml

Model file link: trase/data_pipeline/models/brazil/beef/auxiliary/cnpj/beef_auxiliary_cnpj_2022_new.py

Calls script: trase/data/brazil/beef/auxiliary/cnpj/beef_cnpj_201X_new.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: brazil, auxiliary, beef, cnpj


beef_auxiliary_cnpj_2022_new

Description

Does additional processing and consolidation of CNPJ data for Brazilian beef, based on beef_auxiliary_cnpj_{year}

In particular: * Adds records from SIF when available * Further cleaning of CNPJ and municipality codes * Removes invalid or incomplete entries

beef_auxiliary_cnpj

This dataset is a "lookup" of CNPJs related to a commodity, containing their economic "activity level" and geographic location.

Prior years were generated using the following script: https://github.com/sei-international/TRASE/blob/6db55cca6e81d36f59d17126a60a8732a8fc0acd/trase/data/brazil/multiple/auxiliary/cnpj/commodity_cnpj_year.R

The key input datasets are the CNPJ parquet datasets in brazil/logistics/cnpj/gold/cnpj_YYYY_MM_DD, which for performance reasons are read through Athena in s3_big_data.cnpj_YYYY_MM_DD.

Some of this input data needs a bit of cleaning; for example sometimes there are multiple locations per geocode when there shouldn't be, or the CPF/CNPJ categorisation is incorrect.

We consider a CNPJ from the reference dataset "relevant" if it has one of three criteria: 1) The primary economic activity of the CNPJ is in the CNAES dictionary for the commodity, OR 2) The secondary economic activity of the CNPJ is in the same dictionary, OR 3) The CNPJ appears in the Bill of Lading as trading the commodity'


Details

Column Type Description
company_name VARCHAR Name or empty string if the name is unknown.
cnpj VARCHAR A tax code, which could be a CNPJ or a CPF. This will always be a 14- or 11-digit string (see the TYPE column to distinguish them).
cnpj8 VARCHAR
tax_municipality VARCHAR
original_tax_municipality VARCHAR

Models / Seeds

  • model.trase_duckdb.beef_auxiliary_cnpj_2022
  • model.trase_duckdb.cd_disaggregated_beef_2022_new
  • model.trase_duckdb.brazil_sif_inspected_beef_establishments_2022
"""
This script does additional processing and consolidation of CNPJ data for the Brazilian beef,
based on 'brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{cnpj_year}.csv', created in

'trase/data/brazil/multiple/auxiliary/cnpj/commodity_cnpj_20XX.py'
In particular:
* Adds records from SIF when available
* Further cleaning of CNPJ and municipality codes
* Removes invalid or incomplete entries
"""

from more_itertools import one
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
import pandas as pd
import numpy as np
from trase.models.brazil.beef.constants import *
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools import sps

YEARS = list(range(2010, 2021))


def main():
    for year in YEARS:
        S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
        df, df_flows, df_sif, df_zdc = load_data(year)
        df = preprocess(df, df_flows, df_sif, df_zdc)
        write_csv_for_upload(
            df,
            f"brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{year}_NEW.csv",
            sep=";",
            encoding="utf8",
        )


def load_data(year):
    """
    Load the CNPJ, flows, SIF, and ZDC data for the specified year.
    If this script is called from dbt, this data will be loaded by the dbt model instead.
    """
    # load cnpj data
    if year <= 2015:
        cnpj_year = 2015
    elif year >= 2018:
        # note: we don't have CNPJs for 2018, so for this year we use 2019
        cnpj_year = 2019
    else:
        cnpj_year = year
    df = get_pandas_df_once(
        f"brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{cnpj_year}.csv",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
    )
    df = df.rename(
        columns={"razao_social": "company_name", "geocode": "tax_municipality"},
        errors="raise",
    )
    df = df[["cnae", "level", "type", "cnpj", "company_name", "tax_municipality"]]

    # load flows data
    if year >= 2018:
        flows_path = (
            f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_NEW.csv"
        )
    else:
        flows_path = f"brazil/beef/trade/cd/combined/CD_COMBINED_BEEF_{year}_NEW.csv"
    df_flows = get_pandas_df_once(
        flows_path,
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
    )

    if year > 2018:
        sif_year = 2020
    else:
        sif_year = 2018

    # load sif data
    df_sif = get_pandas_df_once(
        f"brazil/logistics/sanitary_inspections/animal_products/sif/out/{sif_year}_SIF_BEEF.csv",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
    )

    # load zdc data
    if year < 2015:
        df_zdc = pd.DataFrame(
            {
                "country": pd.Series(dtype="str"),
                "commodity": pd.Series(dtype="str"),
                "zdc": pd.Series(dtype="str"),
                "exporter_name": pd.Series(dtype="str"),
                "year": pd.Series(dtype="str"),
                "cnpj8": pd.Series(dtype="str"),
                "applies_to_legal_amazon_only": pd.Series(dtype="bool"),
            }
        )
    else:
        df_zdc = get_pandas_df_once(
            f"brazil/beef/indicators/out/ZDC_DATA_COLLECTION_{year}_BR_BEEF.csv",
            encoding="utf8",
            sep=";",
            dtype=str,
            keep_default_na=False,
        )

    return df, df_flows, df_sif, df_zdc


def preprocess(df, df_flows, df_sif=None, df_zdc=None):
    """
    Add data from the CNPJ data, the flows, and the SIF data to create a consolidated relevant CNPJ dataset.
    As we might not have SIF data for all years, and from 2021 onwards we won't include ZDC data
    at this point, we allow for df_sif and df_zdc to be None.
    """
    df = df[df["level"] != "1"][["company_name", "cnpj", "tax_municipality"]]

    # If there is SIF data, process it; otherwise, skip the following
    if df_sif is not None and not df_sif.empty:
        df_sif.rename(
            columns={"name": "company_name", "municipality": "tax_municipality"},
            inplace=True,
        )
        df_sif = df_sif[["company_name", "cnpj", "tax_municipality"]]

    # concat flows cnpjs and cnpj
    df_flows = df_flows[["exporter_name", "exporter_cnpj", "exporter_geocode"]].copy()
    df_flows = df_flows.rename(
        columns={
            "exporter_geocode": "tax_municipality",
            "exporter_name": "company_name",
            "exporter_cnpj": "cnpj",
        },
        errors="raise",
    )
    # Concatenate only non-empty DataFrames
    dfs_to_concat = [df_flows, df]
    if df_sif is not None and not df_sif.empty:
        dfs_to_concat.insert(0, df_sif)
    df = sps.concat(dfs_to_concat, axis=0)

    # Add CNPJ8 number
    df["cnpj8"] = df["cnpj"].str[:8]

    # Select unique entries
    df = df[["company_name", "cnpj", "cnpj8", "tax_municipality"]].drop_duplicates()

    # If there is no ZDC data, skip the following
    if df_zdc is not None and not df_zdc.empty:

        def get_zdc(row):
            """
            Attach zdc
            """
            try:
                return one(df_zdc[df_zdc["cnpj8"] == row["cnpj8"]].zdc)
            except ValueError:
                try:
                    return one(
                        df_zdc[df_zdc["exporter_name"] == row["company_name"]].zdc
                    )
                except ValueError:
                    return "None"

        df["zdc"] = df.apply(get_zdc, axis=1)

    # Replace unknown municipality geocode
    df["tax_municipality"] = np.where(
        df["tax_municipality"] == "NA",
        UNKNOWN_MUNICIPALITY_GEOCODE,
        df["tax_municipality"],
    )
    df["tax_municipality"] = np.where(
        df["tax_municipality"].isin(["0", "0000005"]),
        UNKNOWN_MUNICIPALITY_GEOCODE,
        df["tax_municipality"],
    )

    # Clean cnpj
    _, df["cnpj"] = zip(*df.cnpj.apply(validate_cnpj_code))

    # Replace cnpj14 with multiple municipalities
    df["original_tax_municipality"] = df["tax_municipality"]
    df["tax_municipality"] = df.apply(limit_cnpj14_municipality, axis=1)

    # Delete missing values
    missing = (
        (df["cnpj"] == "00000000000")
        | (df["tax_municipality"] == "XXXXXXX")
        | (df["tax_municipality"] == "0000000")
    )
    df = df[~missing]

    assert len(df[df["tax_municipality"].isna()]) == 0
    assert len(df[df["tax_municipality"] == "NA"]) == 0
    return df


def validate_cnpj_code(code):
    # first deduce what the type is given the length of string
    # that it appeared in the original data
    if code == "NA":
        code = "0"

    cnpj = code.rjust(14, "0")
    is_valid_cnpj = stdnum.br.cnpj.is_valid(cnpj)

    cpf = code.rjust(11, "0")
    is_valid_cpf = stdnum.br.cpf.is_valid(cpf)

    # we use the checksum method if it was unequivocal and fall back on the
    # string-length method otherwise
    if is_valid_cnpj and not is_valid_cpf:
        return "valid", cnpj
    elif is_valid_cpf and not is_valid_cnpj:
        return "valid", cpf
    else:
        return "invalid", cnpj if len(code) > 11 else "invalid", cpf


def limit_cnpj14_municipality(row):
    if row["cnpj"] in CNPJ14_MUN.keys():
        return CNPJ14_MUN[row["cnpj"]]
    else:
        return row["original_tax_municipality"]


if __name__ == "__main__":
    main()
from trase.data.brazil.beef.auxiliary.cnpj.beef_cnpj_201X_new import preprocess

YEAR = 2022


def model(dbt, cursor):
    dbt.config(materialized="external")

    df_cnpj = dbt.ref("beef_auxiliary_cnpj_2022").df()
    df_flows = dbt.ref("cd_disaggregated_beef_2022_new").df()
    df_sif = dbt.ref("brazil_sif_inspected_beef_establishments_2022").df()

    df_cnpj = df_cnpj.rename(columns={"geocode": "tax_municipality"})

    df = preprocess(
        df=df_cnpj,
        df_flows=df_flows,
        df_sif=df_sif,
    )

    return df