Skip to content

Tobacco Cnpj 2019

s3://trase-storage/brazil/tobacco/auxiliary/cnpj/TOBACCO_CNPJ_2019.csv

Dbt path: trase_production.main_brazil.tobacco_cnpj_2019

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/tobacco/auxiliary/cnpj/_schema.yml

Model file link: trase/data_pipeline/models/brazil/tobacco/auxiliary/cnpj/tobacco_cnpj_2019.py

Calls script: trase/data/brazil/multiple/auxiliary/cnpj/commodity_cnpj_20XX.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, auxiliary, brazil, cnpj, tobacco


tobacco_cnpj_2019

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/multiple/auxiliary/cnpj/COMMODITY_CNPJ_2019.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.lc_brazil_bol_2019
  • source.trase_duckdb.trase-storage-raw.cnpj_2019_cnae_secondary
  • source.trase_duckdb.trase-storage-raw.auxiliary_cnpj_original_cnpj_2019
  • source.trase_duckdb.trase-storage-raw.commodity_dict

Sources

  • ['trase-storage-raw', 'lc_brazil_bol_2019']
  • ['trase-storage-raw', 'cnpj_2019_cnae_secondary']
  • ['trase-storage-raw', 'auxiliary_cnpj_original_cnpj_2019']
  • ['trase-storage-raw', 'commodity_dict']
"""
The goal of this script is to produce a per-commodity "lookup" of CNPJs, containing
the "activity level" and geographic location.

This script creates the following files:
    s3://trase-storage/brazil/<commodity>/auxiliary/cnpj/<COMMODITY>_CNPJ_{year}.parquet

Prior years were generated using the following script:
    https://github.com/sei-international/TRASE/blob/6db55cca6e81d36f59d17126a60a8732a8fc0acd/trase/data/brazil/multiple/auxiliary/cnpj/commodity_cnpj_year.R

The current version has been adapted to be better integrated with DBT, and to process a commodity-year at a time.

The key input datasets are the CNPJ parquet datasets in 'brazil/logistics/cnpj/gold/cnpj_YYYY_MM_DD',
which for performance reasons are read through Athena in 's3_big_data.cnpj_YYYY_MM_DD'.

Some of this input data needs a bit of cleaning; for example sometimes there are multiple locations
per geocode when there shouldn't be, or the CPF/CNPJ categorisation is incorrect.

We consider a CNPJ from the reference dataset "relevant" if it has one of three criteria:
    1) The primary economic activity of the CNPJ is in the CNAES dictionary for the commodity, OR
    2) The secondary economic activity of the CNPJ is in the same dictionary, OR
    3) The CNPJ appears in the Bill of Lading as trading the commodity
"""

from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow.parquet as pq

import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from tqdm import tqdm

from trase.tools.aws.metadata import write_parquet_for_upload


# CNPJ datasets linked to Athena based on those processed in
# brazil/logistics/cnpj/gold/cnpj_YYYY_MM_DD/. Choosing the most relevant
# of those we have so far.
CNPJ_ATHENA_PATH_DICT = {
    2019: "s3_big_data.cnpj_2019_02",
    2020: "s3_big_data.cnpj_2022_11_24",
    2021: "s3_big_data.cnpj_2022_11_24",
    2022: "s3_big_data.cnpj_2022_11_24",
    2023: "s3_big_data.cnpj_2023_08_12",
}
S3_ATHENA_STAGING_DIR = "s3://trase-temp/athena/"

CNAE_DIGITS = 7
GEOCODE_DIGITS = 7


def infer_type(row):
    code = str(row["cnpj_int"])
    length = int(row["cnpj_length"])
    # Deduce type from length
    type_implied_from_length = "cnpj" if length > 11 else "cpf"
    # Check validity
    cnpj = code.rjust(14, "0")
    is_valid_cnpj = stdnum.br.cnpj.is_valid(cnpj)
    cpf = code.rjust(11, "0")
    is_valid_cpf = stdnum.br.cpf.is_valid(cpf)
    if is_valid_cnpj and not is_valid_cpf:
        return "cnpj", "valid"
    elif is_valid_cpf and not is_valid_cnpj:
        return "cpf", "valid"
    elif is_valid_cpf and is_valid_cnpj:
        return type_implied_from_length, "valid"
    else:
        return type_implied_from_length, "invalid"


def process_data(commodity, year, df_bol_cnpjs, df_hscodes, df_cnae):
    """
    Process the data for a specific commodity and year.
    """

    cnpj_athena_path = CNPJ_ATHENA_PATH_DICT[year]

    # validate the commodity exists in df_cnae
    commodities = set(df_cnae["commodity"].str.lower())
    assert commodity in commodities, f"Commodity '{commodity}' CNAE's not found"

    # Filter df_cnae to the commodity of interest
    df_cnae = df_cnae[df_cnae["commodity"].str.lower() == commodity].drop(
        "commodity", axis=1
    )

    # get CNAEs for the given commodity
    cnaes_for_commodity = set(df_cnae["cnae"].astype(int))
    cnaes_str = ", ".join(f"({cnae})" for cnae in cnaes_for_commodity)

    # ==================================================================================== #
    # Get hs codes of interest to filter the BOL
    # ==================================================================================== #

    # filter to hs4's of the commodity of interest
    df_hscodes = df_hscodes[["location", "commodity", "hs_code"]]
    df_hscodes = df_hscodes[df_hscodes["location"] == "BRAZIL"].drop("location", axis=1)
    df_hscodes = df_hscodes[df_hscodes["commodity"].str.lower() == commodity].drop(
        "commodity", axis=1
    )
    df_hscodes["hs4"] = df_hscodes["hs_code"].str[:4]
    df_hscodes = df_hscodes.drop("hs_code", axis=1).drop_duplicates()

    # expecting code/commodity pairs to be unique
    assert df_hscodes["hs4"].is_unique

    # ==================================================================================== #
    # Get CNPJs and the geocodes from the Bill of Lading
    # ==================================================================================== #

    df_bol_cnpjs = df_bol_cnpjs[["cnpj", "hs4"]].drop_duplicates()
    # store the length of the tax code, but cast it to an integer for joining later
    df_bol_cnpjs["cnpj_length"] = df_bol_cnpjs["cnpj"].str.len()
    df_bol_cnpjs["cnpj"] = df_bol_cnpjs["cnpj"].astype("int64")
    df_bol_cnpjs = df_bol_cnpjs.drop_duplicates()

    # filter out some nonsense CNPJs
    df_bol_cnpjs = df_bol_cnpjs[df_bol_cnpjs["cnpj"] > 0]

    # do an inner join on those codes. this means that we keep only the rows which trade
    # in the commodities we are interested in
    df_bol_cnpjs = pd.merge(
        left=df_bol_cnpjs, right=df_hscodes, on=["hs4"], how="inner"
    )
    df_bol_cnpjs = df_bol_cnpjs.drop(columns=["hs4"])
    df_bol_cnpjs = df_bol_cnpjs.drop_duplicates()

    # Build a string list with the bol cnpj values, to later filter the CNPJ dataset
    bol_cnpj_set = set(df_bol_cnpjs["cnpj"])
    cnpj_str = ", ".join(f"{cnpj}" for cnpj in bol_cnpj_set)

    # After the 2019 CNPJ data, fields name changed
    if year == 2019:
        company_name = "razao_social"
        cnae_primary = "cnae_primary"
        municipality_geocode = "geocodmun"
        municipality_name = "municipio"
        registration_status_date = "data_situacao_cadastral"
        registration_status_name = "situacao_cadastral"
        city_id = ""  # 2019 CNPJ data does not have city_id
        city_id_coalesce = ""
    elif year >= 2020:
        company_name = "company_name"
        cnae_primary = "cnae"
        municipality_geocode = "municipality"
        municipality_name = "city_name"
        registration_status_date = "registration_status_date"
        registration_status_name = "registration_status_name"
        city_id = ", id_city"
        city_id_coalesce = ", COALESCE(bol.id_city, cc.id_city) AS id_city"
    else:
        raise ValueError("Years before 2019 were processed by another script.")

    # Get the CNPJ data from Athena, filtering by primary or secondary CNAE
    cursor = connect(s3_staging_dir=S3_ATHENA_STAGING_DIR).cursor(ArrowCursor)
    query = f"""
    WITH
    cnaes AS (
        SELECT cnae
        FROM ( VALUES
            {cnaes_str}
            ) AS t(cnae)
    ),
    base AS (
        SELECT
            TRY_CAST(cnpj AS BIGINT) AS cnpj_int, -- Use as int for comparing and checking validity
            LENGTH(CAST(TRY_CAST(cnpj AS BIGINT) AS VARCHAR)) AS cnpj_length,
            CAST({municipality_geocode} AS VARCHAR) AS geocode,
            {municipality_name} AS municipality_name,
            CAST({cnae_primary} AS INT) AS cnae_primary_int,
            transform(cnae_secondary_array, x -> CAST(x AS INT)) AS cnae_secondary_int,
            {company_name} AS company_name
            {city_id}
        FROM {cnpj_athena_path}
        WHERE
            TRY_CAST(cnpj AS BIGINT) IS NOT NULL
            AND {municipality_name} != 'EXTERIOR'
            AND NOT (
                date_trunc('year', {registration_status_date}) < DATE '2015-01-01'
                AND {registration_status_name} = '08'
            )
            AND cnpj IS NOT NULL
            AND LENGTH(cnpj) <= 14
    ),
    cnae_in_primary AS (
        SELECT
            b.cnpj_int,
            b.cnpj_length,
            b.geocode,
            b.municipality_name,
            b.cnae_primary_int AS cnae,
            b.company_name
            {city_id}
        FROM   base  b
        JOIN   cnaes c
        ON   b.cnae_primary_int = c.cnae
    ),
    cnae_in_secondary AS (
        SELECT
            b.cnpj_int,
            b.cnpj_length,
            b.geocode,
            b.municipality_name,
            sec AS cnae,
            b.company_name
            {city_id}
        FROM   base  b
        CROSS  JOIN UNNEST(b.cnae_secondary_int) AS t(sec)
        JOIN   cnaes c
        ON   sec = c.cnae
    ),
    combined_cnae AS (
        SELECT * FROM cnae_in_primary
        UNION ALL
        SELECT * FROM cnae_in_secondary
    ),
    bol_cnpjs AS (
        SELECT
            cnpj_int,
            cnpj_length,
            geocode,
            municipality_name,
            NULL AS cnae,
            company_name
            {city_id}
        FROM base
        WHERE cnpj_int IN ({cnpj_str})
    ),
    with_bol AS (
        SELECT
            COALESCE(bol.cnpj_int, cc.cnpj_int) AS cnpj_int,
            COALESCE(bol.cnpj_length, cc.cnpj_length) AS cnpj_length,
            COALESCE(bol.geocode, cc.geocode) AS geocode,
            COALESCE(bol.municipality_name, cc.municipality_name) AS municipality_name,
            cc.cnae,
            COALESCE(bol.company_name, cc.company_name) AS company_name,
            CASE WHEN bol.cnpj_int IS NOT NULL THEN TRUE ELSE FALSE END AS cnpj_in_bol
            {city_id_coalesce}
        FROM bol_cnpjs bol
        FULL OUTER JOIN combined_cnae cc
            ON bol.cnpj_int = cc.cnpj_int
    )
    SELECT DISTINCT * FROM with_bol
    """
    print(f"Executing Athena query for {commodity} in {year}...")
    df_merged_cnpjs = cursor.execute(query).as_arrow().to_pandas()

    # ==================================================================================== #
    # Merge in commodity and activity level
    # ==================================================================================== #

    df_cnae["cnae"] = df_cnae["cnae"].astype(int)
    df_cnae = df_cnae.drop_duplicates()

    # ==================================================================================== #
    # Fix empty geocodes based on the municipality name
    # ==================================================================================== #
    location_fixes = {
        "ASSU": "2400208",
        "BOM JESUS": "2401701",
        "SAO VALERIO DA NATIVIDADE": "1720499",
        "FORTALEZA DO TABOCAO": "1708254",
        "SANTA TERESINHA": "2513802",
        "BOA SAUDE": "2405306",
        "LAGOA DO ITAENGA": "2608503",
        "BELEM DE SAO FRANCISCO": "2601607",
        "IGUARACI": "2606903",
    }

    for municipality, geocode in location_fixes.items():
        df_merged_cnpjs.loc[
            (df_merged_cnpjs["geocode"].astype(str).str.len() == 0)
            & (df_merged_cnpjs["municipality_name"] == municipality),
            "geocode",
        ] = geocode

    # print distinct "municipality_name and id_city of records with geocode length of 0
    # this happens with the CNPJ data after 2019
    if year >= 2020:
        if (df_merged_cnpjs["geocode"].astype(str).str.len() == 0).any():
            print(
                "\nMunicipalities with empty geocode (search for the geocodes based on the city id and include in the code):"
            )
            print(
                df_merged_cnpjs[df_merged_cnpjs["geocode"].astype(str).str.len() == 0][
                    ["municipality_name", "id_city"]
                ].drop_duplicates()
            )
            raise ValueError(
                "There are municipalities with empty geocode. Please correct and run again."
            )
        # drop the id_city column if it exists, as it is not used in the rest of the code
        if "id_city" in df_merged_cnpjs.columns:
            df_merged_cnpjs = df_merged_cnpjs.drop(columns=["id_city"])

    # Add the level from the cnae to a final consolidated dataframe df
    df = pd.merge(
        left=df_merged_cnpjs,
        right=df_cnae,
        on="cnae",
        how="outer",
    )

    # ==================================================================================== #
    # Infer CNPJ types and their validity
    # ==================================================================================== #
    tqdm.pandas(desc="Infer types")
    df["type"], df["validity"] = zip(*df.progress_apply(infer_type, axis=1))
    df = df.drop(columns=["cnpj_length"])

    # A cnpj should have only one type
    assert df.drop_duplicates(subset=["cnpj_int", "type"])["cnpj_int"].is_unique

    # we should have one unique geocode per CNPJ
    assert df.drop_duplicates(subset=["cnpj_int", "geocode"])[
        "cnpj_int"
    ].is_unique, "There are multiple geocodes for the same CNPJ/CPF. Adjust the code to pick only one."

    # ==================================================================================== #
    # Final clean and checks
    # ==================================================================================== #

    def rjust_or_na(series, digits):
        return series.apply(
            lambda c: (
                "NA" if pd.isna(c) or c == "NA" else str(int(c)).rjust(digits, "0")
            )
        )

    # pad the tax codes to 11 or 14
    df = df.rename(columns={"cnpj_int": "cnpj"})

    tqdm.pandas(desc="Pad tax codes")
    df["cnpj"] = df.progress_apply(
        lambda row: str(row["cnpj"]).rjust(11 if row["type"] == "cpf" else 14, "0"),
        axis=1,
    )

    # assume activity level of one for CPFs without a level. fill missing level with "NA"
    df.loc[(df["type"] == "cpf") & (df["level"].isnull()), "level"] = 1
    df["level"] = rjust_or_na(df["level"], 1)

    # clean out straggling nulls
    df["cnae"] = rjust_or_na(df["cnae"], CNAE_DIGITS)
    df["geocode"] = rjust_or_na(df["geocode"], GEOCODE_DIGITS)
    df["company_name"] = df["company_name"].fillna("")
    assert not df.isnull().values.any()

    # cnae and geocode are NA or 7 digits
    assert all((df["cnae"] == "NA") | (df["cnae"].str.len() == CNAE_DIGITS))
    assert all((df["geocode"] == "NA") | (df["geocode"].str.len() == GEOCODE_DIGITS))

    # tax codes are 11 or 14 digits
    assert (df[df["type"] == "cpf"]["cnpj"].str.len() == 11).all()
    assert (df[df["type"] == "cnpj"]["cnpj"].str.len() == 14).all()

    # level is NA or 1-5
    assert df["level"].isin(["NA", "1", "2", "3", "4", "5"]).all()

    # a tax code always has at most one geocode and name
    assert df.drop_duplicates(["cnpj", "geocode"])["cnpj"].is_unique
    assert df.drop_duplicates(["cnpj", "company_name"])["cnpj"].is_unique

    # type is "cpf" or "cnpj"
    assert df["type"].isin(["cpf", "cnpj"]).all()

    # validity is "valid" or "invalid"
    assert df["validity"].isin(["valid", "invalid"]).all()

    # a cnpj + activity never appears twice for any given commodity
    assert df.set_index(["cnpj", "cnae"]).index.is_unique

    print("Data processing complete.")

    return df


def main():
    year = 2023
    commodity = "beef"

    # In 2023, the BOL is in parquet in another location
    if year <= 2022:
        bol_path = f"s3://trase-storage/brazil/trade/bol/{year}/BRAZIL_BOL_{year}.csv"
        df_bol_cnpjs = pd.read_csv(
            bol_path,
            sep=";",
            dtype=str,
            keep_default_na=False,
            usecols=["cnpj", "hs4"],
        )
    else:
        bol_path = f"s3://trase-storage/brazil/trade/bol/{year}/gold/brazil_bol_{year}_gold.parquet"
        df_bol_cnpjs = pd.read_parquet(
            bol_path,
            columns=["exporter_cnpj", "hs4"],
        )
        df_bol_cnpjs = df_bol_cnpjs.rename(
            columns={"exporter_cnpj": "cnpj"}
        ).drop_duplicates()

    # read in the dictionary of relevant HS codes, commodity, and location
    df_hscodes = pd.read_parquet(
        "s3://trase-storage/postgres_views/postgres_commodities.parquet",
        columns=["location", "commodity", "hs_code"],
    )

    # Read the dictionary of CNAEs and their levels per commodity
    df_cnae = pd.read_csv(
        "s3://trase-storage/brazil/logistics/cnae/cnae_commodities_levels.csv",
        sep=";",
        dtype=str,
        usecols=["commodity", "cnae", "level"],
    )

    df = process_data(
        commodity=commodity,
        year=year,
        df_bol_cnpjs=df_bol_cnpjs,
        df_hscodes=df_hscodes,
        df_cnae=df_cnae,
    )

    write_parquet_for_upload(
        df,
        f"brazil/{commodity.lower()}/auxiliary/cnpj/{commodity.upper()}_CNPJ_{year}.parquet",
        index=False,
    )


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "lc_brazil_bol_2019")
    dbt.source("trase-storage-raw", "cnpj_2019_cnae_secondary")
    dbt.source("trase-storage-raw", "auxiliary_cnpj_original_cnpj_2019")
    dbt.source("trase-storage-raw", "commodity_dict")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})