Beef Auxiliary Cnpj 2023 New
s3://trase-storage/brazil/beef/auxiliary/cnpj/BEEF_CNPJ_2023_NEW.parquet
Dbt path: trase_production.main_brazil.beef_auxiliary_cnpj_2023_new
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/beef/auxiliary/cnpj/_schema_auxiliary_cnpj.yml
Model file link: trase/data_pipeline/models/brazil/beef/auxiliary/cnpj/beef_auxiliary_cnpj_2023_new.py
Calls script: trase/data/brazil/beef/auxiliary/cnpj/beef_cnpj_201X_new.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: brazil, auxiliary, beef, cnpj
beef_auxiliary_cnpj_2023_new
Description
Does additional processing and consolidation of CNPJ data for Brazilian beef, based on beef_auxiliary_cnpj_{year}
In particular: * Adds records from SIF when available * Further cleaning of CNPJ and municipality codes * Removes invalid or incomplete entries
beef_auxiliary_cnpj
This dataset is a "lookup" of CNPJs related to a commodity, containing their economic "activity level" and geographic location.
Prior years were generated using the following script: https://github.com/sei-international/TRASE/blob/6db55cca6e81d36f59d17126a60a8732a8fc0acd/trase/data/brazil/multiple/auxiliary/cnpj/commodity_cnpj_year.R
The key input datasets are the CNPJ parquet datasets in brazil/logistics/cnpj/gold/cnpj_YYYY_MM_DD,
which for performance reasons are read through Athena in s3_big_data.cnpj_YYYY_MM_DD.
Some of this input data needs a bit of cleaning; for example sometimes there are multiple locations per geocode when there shouldn't be, or the CPF/CNPJ categorisation is incorrect.
We consider a CNPJ from the reference dataset "relevant" if it has one of three criteria: 1) The primary economic activity of the CNPJ is in the CNAES dictionary for the commodity, OR 2) The secondary economic activity of the CNPJ is in the same dictionary, OR 3) The CNPJ appears in the Bill of Lading as trading the commodity'
Details
| Column | Type | Description |
|---|---|---|
company_name |
VARCHAR |
Name or empty string if the name is unknown. |
cnpj |
VARCHAR |
A tax code, which could be a CNPJ or a CPF. This will always be a 14- or 11-digit string (see the TYPE column to distinguish them). |
cnpj8 |
VARCHAR |
|
tax_municipality |
VARCHAR |
|
original_tax_municipality |
VARCHAR |
Models / Seeds
model.trase_duckdb.beef_auxiliary_cnpj_2023model.trase_duckdb.cd_disaggregated_beef_2023_newmodel.trase_duckdb.brazil_sif_inspected_beef_establishments_2023
"""
This script does additional processing and consolidation of CNPJ data for the Brazilian beef,
based on 'brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{cnpj_year}.csv', created in
'trase/data/brazil/multiple/auxiliary/cnpj/commodity_cnpj_20XX.py'
In particular:
* Adds records from SIF when available
* Further cleaning of CNPJ and municipality codes
* Removes invalid or incomplete entries
"""
from more_itertools import one
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
import pandas as pd
import numpy as np
from trase.models.brazil.beef.constants import *
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools import sps
YEARS = list(range(2010, 2021))
def main():
for year in YEARS:
S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
df, df_flows, df_sif, df_zdc = load_data(year)
df = preprocess(df, df_flows, df_sif, df_zdc)
write_csv_for_upload(
df,
f"brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{year}_NEW.csv",
sep=";",
encoding="utf8",
)
def load_data(year):
"""
Load the CNPJ, flows, SIF, and ZDC data for the specified year.
If this script is called from dbt, this data will be loaded by the dbt model instead.
"""
# load cnpj data
if year <= 2015:
cnpj_year = 2015
elif year >= 2018:
# note: we don't have CNPJs for 2018, so for this year we use 2019
cnpj_year = 2019
else:
cnpj_year = year
df = get_pandas_df_once(
f"brazil/beef/auxiliary/cnpj/BEEF_CNPJ_{cnpj_year}.csv",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
df = df.rename(
columns={"razao_social": "company_name", "geocode": "tax_municipality"},
errors="raise",
)
df = df[["cnae", "level", "type", "cnpj", "company_name", "tax_municipality"]]
# load flows data
if year >= 2018:
flows_path = (
f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_NEW.csv"
)
else:
flows_path = f"brazil/beef/trade/cd/combined/CD_COMBINED_BEEF_{year}_NEW.csv"
df_flows = get_pandas_df_once(
flows_path,
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
if year > 2018:
sif_year = 2020
else:
sif_year = 2018
# load sif data
df_sif = get_pandas_df_once(
f"brazil/logistics/sanitary_inspections/animal_products/sif/out/{sif_year}_SIF_BEEF.csv",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
# load zdc data
if year < 2015:
df_zdc = pd.DataFrame(
{
"country": pd.Series(dtype="str"),
"commodity": pd.Series(dtype="str"),
"zdc": pd.Series(dtype="str"),
"exporter_name": pd.Series(dtype="str"),
"year": pd.Series(dtype="str"),
"cnpj8": pd.Series(dtype="str"),
"applies_to_legal_amazon_only": pd.Series(dtype="bool"),
}
)
else:
df_zdc = get_pandas_df_once(
f"brazil/beef/indicators/out/ZDC_DATA_COLLECTION_{year}_BR_BEEF.csv",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
return df, df_flows, df_sif, df_zdc
def preprocess(df, df_flows, df_sif=None, df_zdc=None):
"""
Add data from the CNPJ data, the flows, and the SIF data to create a consolidated relevant CNPJ dataset.
As we might not have SIF data for all years, and from 2021 onwards we won't include ZDC data
at this point, we allow for df_sif and df_zdc to be None.
"""
df = df[df["level"] != "1"][["company_name", "cnpj", "tax_municipality"]]
# If there is SIF data, process it; otherwise, skip the following
if df_sif is not None and not df_sif.empty:
df_sif.rename(
columns={"name": "company_name", "municipality": "tax_municipality"},
inplace=True,
)
df_sif = df_sif[["company_name", "cnpj", "tax_municipality"]]
# concat flows cnpjs and cnpj
df_flows = df_flows[["exporter_name", "exporter_cnpj", "exporter_geocode"]].copy()
df_flows = df_flows.rename(
columns={
"exporter_geocode": "tax_municipality",
"exporter_name": "company_name",
"exporter_cnpj": "cnpj",
},
errors="raise",
)
# Concatenate only non-empty DataFrames
dfs_to_concat = [df_flows, df]
if df_sif is not None and not df_sif.empty:
dfs_to_concat.insert(0, df_sif)
df = sps.concat(dfs_to_concat, axis=0)
# Add CNPJ8 number
df["cnpj8"] = df["cnpj"].str[:8]
# Select unique entries
df = df[["company_name", "cnpj", "cnpj8", "tax_municipality"]].drop_duplicates()
# If there is no ZDC data, skip the following
if df_zdc is not None and not df_zdc.empty:
def get_zdc(row):
"""
Attach zdc
"""
try:
return one(df_zdc[df_zdc["cnpj8"] == row["cnpj8"]].zdc)
except ValueError:
try:
return one(
df_zdc[df_zdc["exporter_name"] == row["company_name"]].zdc
)
except ValueError:
return "None"
df["zdc"] = df.apply(get_zdc, axis=1)
# Replace unknown municipality geocode
df["tax_municipality"] = np.where(
df["tax_municipality"] == "NA",
UNKNOWN_MUNICIPALITY_GEOCODE,
df["tax_municipality"],
)
df["tax_municipality"] = np.where(
df["tax_municipality"].isin(["0", "0000005"]),
UNKNOWN_MUNICIPALITY_GEOCODE,
df["tax_municipality"],
)
# Clean cnpj
_, df["cnpj"] = zip(*df.cnpj.apply(validate_cnpj_code))
# Replace cnpj14 with multiple municipalities
df["original_tax_municipality"] = df["tax_municipality"]
df["tax_municipality"] = df.apply(limit_cnpj14_municipality, axis=1)
# Delete missing values
missing = (
(df["cnpj"] == "00000000000")
| (df["tax_municipality"] == "XXXXXXX")
| (df["tax_municipality"] == "0000000")
)
df = df[~missing]
assert len(df[df["tax_municipality"].isna()]) == 0
assert len(df[df["tax_municipality"] == "NA"]) == 0
return df
def validate_cnpj_code(code):
# first deduce what the type is given the length of string
# that it appeared in the original data
if code == "NA":
code = "0"
cnpj = code.rjust(14, "0")
is_valid_cnpj = stdnum.br.cnpj.is_valid(cnpj)
cpf = code.rjust(11, "0")
is_valid_cpf = stdnum.br.cpf.is_valid(cpf)
# we use the checksum method if it was unequivocal and fall back on the
# string-length method otherwise
if is_valid_cnpj and not is_valid_cpf:
return "valid", cnpj
elif is_valid_cpf and not is_valid_cnpj:
return "valid", cpf
else:
return "invalid", cnpj if len(code) > 11 else "invalid", cpf
def limit_cnpj14_municipality(row):
if row["cnpj"] in CNPJ14_MUN.keys():
return CNPJ14_MUN[row["cnpj"]]
else:
return row["original_tax_municipality"]
if __name__ == "__main__":
main()
from trase.data.brazil.beef.auxiliary.cnpj.beef_cnpj_201X_new import preprocess
YEAR = 2023
def model(dbt, cursor):
dbt.config(materialized="external")
df_cnpj = dbt.ref("beef_auxiliary_cnpj_2023").df()
df_flows = dbt.ref("cd_disaggregated_beef_2023_new").df()
df_sif = dbt.ref("brazil_sif_inspected_beef_establishments_2023").df()
df_cnpj = df_cnpj.rename(columns={"geocode": "tax_municipality"})
df = preprocess(
df=df_cnpj,
df_flows=df_flows,
df_sif=df_sif,
)
return df