CNPJ Descriptive Analysis

View or edit on GitHub

This page is synchronized from trase/models/brazil/soy/CNPJ Descriptive Analysis.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin. Please view or edit the original file there; changes should be reflected here after a midnight build (CET time), or manually triggering it with a GitHub action (link).

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from trase.tools import CNX
from trase.tools.pandasdb.query import query_with_dataframe
from trase.tools.pandasdb import read
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_json
from psycopg2 import sql
import numpy as np
import plotly.express as px
from trase.tools.sps import *
from trase.tools.pandasdb.query import *

RFB dataset analysis

### 1- Load CNPJ data

### sql codes used to bring data from postgres:

### Bring cnae_secondary of interest stored in arrays
# SELECT
#    cnpj,
#    company_name,
#    trade_name,
#    STRING_TO_ARRAY(cnae_secondary , ',')
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') ## replace codes
#    OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;

### Bring cnae_secondary of interest exploded in individual cells
# SELECT
#    cnpj,
#    company_name,
#    trade_name,
#    UNNEST(STRING_TO_ARRAY(cnae_secondary , ','))
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') # replace codes
#    OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;

### Bring rows of interest with no transformation (with no cnae_secondary transformations)
# SELECT
#    cnpj,
#    company_name,
#    trade_name,
#    cnae,
#    cnae_secondary
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') #replace codes
#    OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;


### option 1 - load cnpj from postgres

df_cnpj_ccs = pd.read_sql(
    sql.SQL(
        """
            SELECT cnpj, company_name,trade_name, cnae,
            UNNEST(STRING_TO_ARRAY(cnae_secondary , ',')) as cnae_secondary
            FROM cnpj.cnpj
            WHERE cnae IN ('115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003') 
            OR (STRING_TO_ARRAY(cnae_secondary , ',') && 
            ARRAY['115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003']);
            """
    ),
    CNX.cnx,
)

df_cnpj_ccs["cnae"] = df_cnpj_ccs["cnae"].astype(str)

# Format float columns
pd.set_option("float_format", "{:f}".format)

### option 2 - 1. Use parquet file to load cnpj data
# df_cnpj_ccs = pd.read_parquet("/opt/shared/brazil_cnpj/cnpj.parquet")
# df_cnpj_ccs_ = df_cnpj_ccs.copy()

### option 2 - 2. Transform data stored in parquet file

# Create list with relevant cnae
# list_cnae = ['115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003']

# results = pd.DataFrame()

# Define the chunk size (number of rows per chunk)
# chunk_size = 10000

# Define a starting index
# start_index = 0

# Loop through the data in chunks
# while start_index < len(df_cnpj_ccs_parquet):
# Calculate the end index for the chunk
# end_index = min(start_index + chunk_size, len(df_cnpj_ccs_parquet))

# Get the chunk of data
# chunk = df_cnpj_ccs_parquet.iloc[start_index:end_index]

# fill NaN
# chunk['cnae_secondary'].fillna(0, inplace=True)
# split lists
# chunk['cnae_secondary'] = chunk['cnae_secondary'].str.split(',')
# explode lists
# chunk = chunk.explode('cnae_secondary', ignore_index=True)
# chunk['cnae'] = chunk['cnae'].astype(str)
# select relevant primary cnae
# mask_A = chunk['cnae'].isin(list_cnae)
# select relevant secondary cnae
# mask_B = chunk['cnae_secondary'].isin(list_cnae)
# select rows of interest in the dataset
# selected_rows = chunk[mask_A | mask_B]
# selected = pd.concat([mask_A, mask_B])
# append results
# results = results.append(selected_rows, ignore_index = True)


# Update the start index for the next chunk
# print(end_index)
# start_index = end_index
# check

len(df_cnpj_ccs)
# check

df_cnpj_ccs.isna().sum()
df_cnpj_ccs[df_cnpj_ccs.isna().any(axis=1)]
df_cnpj_ccs.info()
### 2- Classify by level category
# Levels ranges from 1 to 4 and are related to relevant primary cnae.
# Data not classified corresponds to rows with relevant secondary cnae (no relevant primary cnae), which have not been classified into levels categories.

# take a copy of the results
# valid_cnpj = results


# define function for the classification
def get_level_value(row):
    if row in ["115600", "141501", "163600"]:
        return "Level 1"
    elif row in ["5211701", "5211799"]:
        return "Level 2"
    elif row in ["1041400", "1042200", "1069400"]:
        return "Level 3"
    elif row in ["4622200", "4623108", "4623199", "4632001", "4632002", "4632003"]:
        return "Level 4"
    else:
        return "Data not classified"


# apply function to new column
df_cnpj_ccs["level"] = df_cnpj_ccs["cnae"].apply(get_level_value)

# check if sum of company name unique values is preserved
check_unique_values = df_cnpj_ccs["company_name"].nunique()
print(check_unique_values)
### 3 - Data aggregation  - Level  1

# Bar chart 1 - companies_names by levels
rfb_group_1 = (
    df_cnpj_ccs.groupby(["level"]).agg({"company_name": "nunique"}).reset_index()
)

# Bar Chart 2 - cnpj by levels
rfb_group_2 = df_cnpj_ccs.groupby(["level"]).agg({"cnpj": "nunique"}).reset_index()
### 4 - Data aggregation - Level  2

# secondary_cnae and cnpj by company
rfb_group_0 = (
    df_cnpj_ccs.groupby(["company_name", "level", "cnae"])
    .agg({"cnae_secondary": "nunique", "cnpj": "nunique"})
    .reset_index()
)
### 5 - Create ranges column

# find bins thresholds
max_value_cnpj = rfb_group_0["cnpj"].max()
max_value_secCnae = rfb_group_0["cnae_secondary"].max()
print(max_value_cnpj, max_value_secCnae)
# Create ranges

bins_cnpj = [0, 1, 10, 50, 100, 200, 500, 800, 1000, 1040]
labels_cnpj = [
    "1 cnpj",
    "2-10 cnpj",
    "11-50 cnpj",
    "51-100 cnpj",
    "101-200 cnpj",
    "201-500 cnpj",
    "501-800 cnpj",
    "801-1000 cnpj",
    "1001-1040 cnpj",
]


bins_scnae = [-1, 0, 1, 10, 20, 50, 80, 109, 136]
labels_scnae = [
    "No secondary cnae",
    "1 secondary cnae",
    "2-10 secondary cnae",
    "11-20 secondary cnae",
    "21-50 secondary cnae",
    "51-80 secondary cnae",
    "81-109 secondary cnae",
    "81-136 secondary cnae",
]


rfb_group_0["cnpj_ranges"] = pd.cut(rfb_group_0["cnpj"], bins_cnpj, labels=labels_cnpj)

rfb_group_0["cnae_secondary_ranges"] = pd.cut(
    rfb_group_0["cnae_secondary"], bins_scnae, labels=labels_scnae, include_lowest=True
)
### 6 - Data aggregation - Level 3

# Bar chart 3 - companies_names by cnpj ranges and level
rfb_group_3 = (
    rfb_group_0[["company_name", "level", "cnpj_ranges"]]
    .groupby(["level", "cnpj_ranges"])
    .agg({"company_name": "count"})
    .reset_index()
)

# Bar chart 4 - companies_name by secondary cnae ranges and level
rfb_group_4 = (
    rfb_group_0[["company_name", "level", "cnae_secondary_ranges"]]
    .groupby(["level", "cnae_secondary_ranges"])
    .agg({"company_name": "count"})
    .reset_index()
)
### 7 - Bar charts

# Bar chart 1

fig = px.bar(
    rfb_group_1,
    x="level",
    y="company_name",
    title="Distribution of companies among levels of activity",
    labels={"level": "Level of activity", "company_name": "Amount of companies"},
)

fig.update_layout(width=700, height=500)

fig.show("png")
# Bar chart 2

fig = px.bar(
    rfb_group_2,
    x="level",
    y="cnpj",
    title="Distribution of CNPJ among levels of activity",
    labels={"level": "Level of activity", "cnpj": "Amount of CNPJ"},
)

fig.update_layout(width=700, height=500)

fig.show("png")
# Bar Chart 3

fig = px.bar(
    rfb_group_3,
    x="level",
    y="company_name",
    color="cnpj_ranges",
    barmode="group",
    title="Amount of cnpj holded by companies, by level of activity",
    labels={
        "level": "Level of activity",
        "company_name": "Amount of companies",
        "cnpj_ranges": "Cnpj ranges",
    },
)

fig.update_layout(width=700, height=500)


# Show the plot
fig.show("png")
# Bar Chart 4

fig = px.bar(
    rfb_group_4,
    x="level",
    y="company_name",
    color="cnae_secondary_ranges",
    barmode="group",
    title="Amount of CNAE holded by companies, by level of activity",
    labels={
        "level": "Level of activity",
        "company_name": "Amount of companies",
        "cnae_secondary_ranges": "Secondary CNAE ranges",
    },
)

fig.update_layout(width=700, height=500)


fig.show("png")

BOL dataset analysis

### 1 - Load BOL dataset


def load_data():
    """Load BOL data"""
    df_bol = get_pandas_df_once(
        "brazil/trade/bol/2022/BRAZIL_BOL_2022.csv",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
    )
    df_bol["vol"] = df_bol["vol"].astype(float)
    return df_bol


df_bol = load_data()
# select BOL data subset
df_bol_subset = df_bol[["exporter.cnpj", "exporter.label", "hs4", "vol"]]

# remove duplicates
df_bol_subset = df_bol_subset.drop_duplicates()
len(df_bol_subset)
### 2 - Filter BOL data by hs4 codes of interest

# Extract distinct hs4 code from splitgraph.commodities
from trase.tools import uses_database


@uses_database
def get_hs4_codes(cnx=None):
    hs4_codes = pd.read_sql(
        "SELECT DISTINCT substr(hs_code, 0, 5) as hs4, commodity, hs_code FROM splitgraph.commodities",
        con=cnx.cnx,
    )
    return hs4_codes


df_hs4 = get_hs4_codes()

df_hs4_c = df_hs4[df_hs4["commodity"] == "SOY"]

list_hs4_c = list(df_hs4_c["hs4"])

# Filter BOL dataset
df_bol_c = df_bol_subset[df_bol_subset["hs4"].isin(list_hs4_c)]

### 3 - Data aggregation - Level 1

# Group vol, hs4 and cnpj by exporter
bol_group_1 = (
    df_bol_c.groupby(["exporter.label"])
    .agg({"vol": "sum", "exporter.cnpj": "nunique", "hs4": "nunique"})
    .reset_index()
)
### 5 - Create range column

# find bins thresholds
max_value_cnpj = bol_group_1["exporter.cnpj"].max()
max_value_hs4 = bol_group_1["hs4"].max()
print(max_value_cnpj, max_value_hs4)
bol_group_1 = bol_group_1.copy()

# create ranges
bins_cnpj = [0, 1, 10, 20]
labels_cnpj = ["1 cnpj", "2-10 cnpj", "11-20 cnpj"]

bins_sh4 = [0, 1, 2, 3, 4]
labels_sh4 = ["1 SH4 code", "2 SH4 codes", "3 SH4 codes", "4 SH4 codes"]

bol_group_1["cnpj_ranges"] = pd.cut(
    bol_group_1["exporter.cnpj"], bins_cnpj, labels=labels_cnpj
)

bol_group_1["sh4_ranges"] = pd.cut(
    bol_group_1["exporter.cnpj"], bins_sh4, labels=labels_sh4
)
### 6 - Make Bar charts

# Bar chart 1 - Group exporters by cnpj ranges
bol_group_2 = (
    bol_group_1.groupby("cnpj_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar chart 1
fig = px.bar(
    bol_group_2,
    x="cnpj_ranges",
    y="exporter.label",
    barmode="group",
    title="Amount of cnpj holded by companies",
    labels={"cnpj_ranges": "CNPJ ranges", "exporter.label": "Amount of companies"},
)

fig.update_layout(width=700, height=400)

fig.show("png")
# Bar Chart 2 - Group exporters by sh4
bol_group_3 = (
    bol_group_1.groupby("sh4_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar Chart 2
fig = px.bar(
    bol_group_3,
    x="sh4_ranges",
    y="exporter.label",
    barmode="group",
    title="Amount of SH4 codes holded by companies",
    labels={"sh4_ranges": "SH4 Codes", "exporter.label": "Amount of companies"},
)

fig.update_layout(width=700, height=400)

fig.show("png")

Comparison between datasets

### 1- drop duplicates

# inspect

df_cnpj_ccs.duplicated(subset=["cnpj"]).sum()
df_bol_c.duplicated(subset=["exporter.cnpj"]).sum()
# drop


bol_unique_s = bol_unique[["exporter.cnpj", "hs4"]]

unique_soy_cnpjs = bol_unique_s["exporter.cnpj"].drop_duplicates()
df = query_with_dataframe(
    unique_soy_cnpjs.astype(int).rename("cnpj"),
    """
    select cnpj, cnae, cnae_secondary
    from df
    left join (
      select cnpj::bigint, cnae, cnae_secondary from cnpj.cnpj
    ) v using (cnpj)
    """,
)
print(
    "How many CNPJs from the BoL dataset matched with the CNPJ database:\n",
    (~df["cnae"].isnull()).value_counts(),
)
df.head()
matched = df[~df["cnae"].isnull()]
matched["match"] = "matched"
unmatched = df[df["cnae"].isnull()]
unmatched["match"] = "unmatched"
df_concat = pd.concat([matched, unmatched], ignore_index=True)
### 2 - Transform dataset

# Create a new column named 'ID' with a range of values
df_concat["ID"] = range(1, len(df_concat) + 1)
### 3 - Make Bar Charts

# Bar chart 1 - Aggregation
comparison_group_1 = (
    df_concat.groupby(["match"], dropna=False).agg({"ID": "count"}).reset_index()
)

# Bar Chart 1
fig = px.bar(
    comparison_group_1,
    x="match",
    y="ID",
    barmode="group",
    title="Matching and unmatching values CNPJ between BOL and RFB datasets",
    labels={"merge": "datasets", "ID": "Amount of matching CNPJ values"},
)

fig.update_layout(width=700, height=400)

fig.show("png")
comparison_group_1.head()
# 4- Inspect bol data for entries that do not have matching values in the CNPJ database
unmatched_row = df_concat[df_concat["match"] == "unmatched"]
unmatched_row