CNPJ Descriptive Analysis
View or edit on GitHub
This page is synchronized from trase/models/brazil/soy/CNPJ Descriptive Analysis.ipynb. Last modified on 2025-12-14 23:19 CET by Trase Admin.
Please view or edit the original file there; changes should be reflected here after a midnight build (CET time),
or manually triggering it with a GitHub action (link).
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from trase.tools import CNX
from trase.tools.pandasdb.query import query_with_dataframe
from trase.tools.pandasdb import read
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.aws_helpers import read_json
from psycopg2 import sql
import numpy as np
import plotly.express as px
from trase.tools.sps import *
from trase.tools.pandasdb.query import *
RFB dataset analysis
### 1- Load CNPJ data
### sql codes used to bring data from postgres:
### Bring cnae_secondary of interest stored in arrays
# SELECT
# cnpj,
# company_name,
# trade_name,
# STRING_TO_ARRAY(cnae_secondary , ',')
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') ## replace codes
# OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;
### Bring cnae_secondary of interest exploded in individual cells
# SELECT
# cnpj,
# company_name,
# trade_name,
# UNNEST(STRING_TO_ARRAY(cnae_secondary , ','))
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') # replace codes
# OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;
### Bring rows of interest with no transformation (with no cnae_secondary transformations)
# SELECT
# cnpj,
# company_name,
# trade_name,
# cnae,
# cnae_secondary
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') #replace codes
# OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;
### option 1 - load cnpj from postgres
df_cnpj_ccs = pd.read_sql(
sql.SQL(
"""
SELECT cnpj, company_name,trade_name, cnae,
UNNEST(STRING_TO_ARRAY(cnae_secondary , ',')) as cnae_secondary
FROM cnpj.cnpj
WHERE cnae IN ('115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003')
OR (STRING_TO_ARRAY(cnae_secondary , ',') &&
ARRAY['115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003']);
"""
),
CNX.cnx,
)
df_cnpj_ccs["cnae"] = df_cnpj_ccs["cnae"].astype(str)
# Format float columns
pd.set_option("float_format", "{:f}".format)
### option 2 - 1. Use parquet file to load cnpj data
# df_cnpj_ccs = pd.read_parquet("/opt/shared/brazil_cnpj/cnpj.parquet")
# df_cnpj_ccs_ = df_cnpj_ccs.copy()
### option 2 - 2. Transform data stored in parquet file
# Create list with relevant cnae
# list_cnae = ['115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003']
# results = pd.DataFrame()
# Define the chunk size (number of rows per chunk)
# chunk_size = 10000
# Define a starting index
# start_index = 0
# Loop through the data in chunks
# while start_index < len(df_cnpj_ccs_parquet):
# Calculate the end index for the chunk
# end_index = min(start_index + chunk_size, len(df_cnpj_ccs_parquet))
# Get the chunk of data
# chunk = df_cnpj_ccs_parquet.iloc[start_index:end_index]
# fill NaN
# chunk['cnae_secondary'].fillna(0, inplace=True)
# split lists
# chunk['cnae_secondary'] = chunk['cnae_secondary'].str.split(',')
# explode lists
# chunk = chunk.explode('cnae_secondary', ignore_index=True)
# chunk['cnae'] = chunk['cnae'].astype(str)
# select relevant primary cnae
# mask_A = chunk['cnae'].isin(list_cnae)
# select relevant secondary cnae
# mask_B = chunk['cnae_secondary'].isin(list_cnae)
# select rows of interest in the dataset
# selected_rows = chunk[mask_A | mask_B]
# selected = pd.concat([mask_A, mask_B])
# append results
# results = results.append(selected_rows, ignore_index = True)
# Update the start index for the next chunk
# print(end_index)
# start_index = end_index
# check
len(df_cnpj_ccs)
# check
df_cnpj_ccs.isna().sum()
df_cnpj_ccs[df_cnpj_ccs.isna().any(axis=1)]
df_cnpj_ccs.info()
### 2- Classify by level category
# Levels ranges from 1 to 4 and are related to relevant primary cnae.
# Data not classified corresponds to rows with relevant secondary cnae (no relevant primary cnae), which have not been classified into levels categories.
# take a copy of the results
# valid_cnpj = results
# define function for the classification
def get_level_value(row):
if row in ["115600", "141501", "163600"]:
return "Level 1"
elif row in ["5211701", "5211799"]:
return "Level 2"
elif row in ["1041400", "1042200", "1069400"]:
return "Level 3"
elif row in ["4622200", "4623108", "4623199", "4632001", "4632002", "4632003"]:
return "Level 4"
else:
return "Data not classified"
# apply function to new column
df_cnpj_ccs["level"] = df_cnpj_ccs["cnae"].apply(get_level_value)
# check if sum of company name unique values is preserved
check_unique_values = df_cnpj_ccs["company_name"].nunique()
print(check_unique_values)
### 3 - Data aggregation - Level 1
# Bar chart 1 - companies_names by levels
rfb_group_1 = (
df_cnpj_ccs.groupby(["level"]).agg({"company_name": "nunique"}).reset_index()
)
# Bar Chart 2 - cnpj by levels
rfb_group_2 = df_cnpj_ccs.groupby(["level"]).agg({"cnpj": "nunique"}).reset_index()
### 4 - Data aggregation - Level 2
# secondary_cnae and cnpj by company
rfb_group_0 = (
df_cnpj_ccs.groupby(["company_name", "level", "cnae"])
.agg({"cnae_secondary": "nunique", "cnpj": "nunique"})
.reset_index()
)
### 5 - Create ranges column
# find bins thresholds
max_value_cnpj = rfb_group_0["cnpj"].max()
max_value_secCnae = rfb_group_0["cnae_secondary"].max()
print(max_value_cnpj, max_value_secCnae)
# Create ranges
bins_cnpj = [0, 1, 10, 50, 100, 200, 500, 800, 1000, 1040]
labels_cnpj = [
"1 cnpj",
"2-10 cnpj",
"11-50 cnpj",
"51-100 cnpj",
"101-200 cnpj",
"201-500 cnpj",
"501-800 cnpj",
"801-1000 cnpj",
"1001-1040 cnpj",
]
bins_scnae = [-1, 0, 1, 10, 20, 50, 80, 109, 136]
labels_scnae = [
"No secondary cnae",
"1 secondary cnae",
"2-10 secondary cnae",
"11-20 secondary cnae",
"21-50 secondary cnae",
"51-80 secondary cnae",
"81-109 secondary cnae",
"81-136 secondary cnae",
]
rfb_group_0["cnpj_ranges"] = pd.cut(rfb_group_0["cnpj"], bins_cnpj, labels=labels_cnpj)
rfb_group_0["cnae_secondary_ranges"] = pd.cut(
rfb_group_0["cnae_secondary"], bins_scnae, labels=labels_scnae, include_lowest=True
)
### 6 - Data aggregation - Level 3
# Bar chart 3 - companies_names by cnpj ranges and level
rfb_group_3 = (
rfb_group_0[["company_name", "level", "cnpj_ranges"]]
.groupby(["level", "cnpj_ranges"])
.agg({"company_name": "count"})
.reset_index()
)
# Bar chart 4 - companies_name by secondary cnae ranges and level
rfb_group_4 = (
rfb_group_0[["company_name", "level", "cnae_secondary_ranges"]]
.groupby(["level", "cnae_secondary_ranges"])
.agg({"company_name": "count"})
.reset_index()
)
### 7 - Bar charts
# Bar chart 1
fig = px.bar(
rfb_group_1,
x="level",
y="company_name",
title="Distribution of companies among levels of activity",
labels={"level": "Level of activity", "company_name": "Amount of companies"},
)
fig.update_layout(width=700, height=500)
fig.show("png")
# Bar chart 2
fig = px.bar(
rfb_group_2,
x="level",
y="cnpj",
title="Distribution of CNPJ among levels of activity",
labels={"level": "Level of activity", "cnpj": "Amount of CNPJ"},
)
fig.update_layout(width=700, height=500)
fig.show("png")
# Bar Chart 3
fig = px.bar(
rfb_group_3,
x="level",
y="company_name",
color="cnpj_ranges",
barmode="group",
title="Amount of cnpj holded by companies, by level of activity",
labels={
"level": "Level of activity",
"company_name": "Amount of companies",
"cnpj_ranges": "Cnpj ranges",
},
)
fig.update_layout(width=700, height=500)
# Show the plot
fig.show("png")
# Bar Chart 4
fig = px.bar(
rfb_group_4,
x="level",
y="company_name",
color="cnae_secondary_ranges",
barmode="group",
title="Amount of CNAE holded by companies, by level of activity",
labels={
"level": "Level of activity",
"company_name": "Amount of companies",
"cnae_secondary_ranges": "Secondary CNAE ranges",
},
)
fig.update_layout(width=700, height=500)
fig.show("png")
BOL dataset analysis
### 1 - Load BOL dataset
def load_data():
"""Load BOL data"""
df_bol = get_pandas_df_once(
"brazil/trade/bol/2022/BRAZIL_BOL_2022.csv",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
df_bol["vol"] = df_bol["vol"].astype(float)
return df_bol
df_bol = load_data()
# select BOL data subset
df_bol_subset = df_bol[["exporter.cnpj", "exporter.label", "hs4", "vol"]]
# remove duplicates
df_bol_subset = df_bol_subset.drop_duplicates()
len(df_bol_subset)
### 2 - Filter BOL data by hs4 codes of interest
# Extract distinct hs4 code from splitgraph.commodities
from trase.tools import uses_database
@uses_database
def get_hs4_codes(cnx=None):
hs4_codes = pd.read_sql(
"SELECT DISTINCT substr(hs_code, 0, 5) as hs4, commodity, hs_code FROM splitgraph.commodities",
con=cnx.cnx,
)
return hs4_codes
df_hs4 = get_hs4_codes()
df_hs4_c = df_hs4[df_hs4["commodity"] == "SOY"]
list_hs4_c = list(df_hs4_c["hs4"])
# Filter BOL dataset
df_bol_c = df_bol_subset[df_bol_subset["hs4"].isin(list_hs4_c)]
### 3 - Data aggregation - Level 1
# Group vol, hs4 and cnpj by exporter
bol_group_1 = (
df_bol_c.groupby(["exporter.label"])
.agg({"vol": "sum", "exporter.cnpj": "nunique", "hs4": "nunique"})
.reset_index()
)
### 5 - Create range column
# find bins thresholds
max_value_cnpj = bol_group_1["exporter.cnpj"].max()
max_value_hs4 = bol_group_1["hs4"].max()
print(max_value_cnpj, max_value_hs4)
bol_group_1 = bol_group_1.copy()
# create ranges
bins_cnpj = [0, 1, 10, 20]
labels_cnpj = ["1 cnpj", "2-10 cnpj", "11-20 cnpj"]
bins_sh4 = [0, 1, 2, 3, 4]
labels_sh4 = ["1 SH4 code", "2 SH4 codes", "3 SH4 codes", "4 SH4 codes"]
bol_group_1["cnpj_ranges"] = pd.cut(
bol_group_1["exporter.cnpj"], bins_cnpj, labels=labels_cnpj
)
bol_group_1["sh4_ranges"] = pd.cut(
bol_group_1["exporter.cnpj"], bins_sh4, labels=labels_sh4
)
### 6 - Make Bar charts
# Bar chart 1 - Group exporters by cnpj ranges
bol_group_2 = (
bol_group_1.groupby("cnpj_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar chart 1
fig = px.bar(
bol_group_2,
x="cnpj_ranges",
y="exporter.label",
barmode="group",
title="Amount of cnpj holded by companies",
labels={"cnpj_ranges": "CNPJ ranges", "exporter.label": "Amount of companies"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
# Bar Chart 2 - Group exporters by sh4
bol_group_3 = (
bol_group_1.groupby("sh4_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar Chart 2
fig = px.bar(
bol_group_3,
x="sh4_ranges",
y="exporter.label",
barmode="group",
title="Amount of SH4 codes holded by companies",
labels={"sh4_ranges": "SH4 Codes", "exporter.label": "Amount of companies"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
Comparison between datasets
### 1- drop duplicates
# inspect
df_cnpj_ccs.duplicated(subset=["cnpj"]).sum()
df_bol_c.duplicated(subset=["exporter.cnpj"]).sum()
# drop
bol_unique_s = bol_unique[["exporter.cnpj", "hs4"]]
unique_soy_cnpjs = bol_unique_s["exporter.cnpj"].drop_duplicates()
df = query_with_dataframe(
unique_soy_cnpjs.astype(int).rename("cnpj"),
"""
select cnpj, cnae, cnae_secondary
from df
left join (
select cnpj::bigint, cnae, cnae_secondary from cnpj.cnpj
) v using (cnpj)
""",
)
print(
"How many CNPJs from the BoL dataset matched with the CNPJ database:\n",
(~df["cnae"].isnull()).value_counts(),
)
df.head()
matched = df[~df["cnae"].isnull()]
matched["match"] = "matched"
unmatched = df[df["cnae"].isnull()]
unmatched["match"] = "unmatched"
df_concat = pd.concat([matched, unmatched], ignore_index=True)
### 2 - Transform dataset
# Create a new column named 'ID' with a range of values
df_concat["ID"] = range(1, len(df_concat) + 1)
### 3 - Make Bar Charts
# Bar chart 1 - Aggregation
comparison_group_1 = (
df_concat.groupby(["match"], dropna=False).agg({"ID": "count"}).reset_index()
)
# Bar Chart 1
fig = px.bar(
comparison_group_1,
x="match",
y="ID",
barmode="group",
title="Matching and unmatching values CNPJ between BOL and RFB datasets",
labels={"merge": "datasets", "ID": "Amount of matching CNPJ values"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
comparison_group_1.head()
# 4- Inspect bol data for entries that do not have matching values in the CNPJ database
unmatched_row = df_concat[df_concat["match"] == "unmatched"]
unmatched_row
### 1- Load CNPJ data
### sql codes used to bring data from postgres:
### Bring cnae_secondary of interest stored in arrays
# SELECT
# cnpj,
# company_name,
# trade_name,
# STRING_TO_ARRAY(cnae_secondary , ',')
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') ## replace codes
# OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;
### Bring cnae_secondary of interest exploded in individual cells
# SELECT
# cnpj,
# company_name,
# trade_name,
# UNNEST(STRING_TO_ARRAY(cnae_secondary , ','))
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') # replace codes
# OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;
### Bring rows of interest with no transformation (with no cnae_secondary transformations)
# SELECT
# cnpj,
# company_name,
# trade_name,
# cnae,
# cnae_secondary
# FROM cnpj.cnpj
# WHERE cnae IN ('1411802', '1412601') #replace codes
# OR (STRING_TO_ARRAY(cnae_secondary , ',') && ARRAY['1411802', '1412601'] #replace codes
# )
# LIMIT 50;
### option 1 - load cnpj from postgres
df_cnpj_ccs = pd.read_sql(
sql.SQL(
"""
SELECT cnpj, company_name,trade_name, cnae,
UNNEST(STRING_TO_ARRAY(cnae_secondary , ',')) as cnae_secondary
FROM cnpj.cnpj
WHERE cnae IN ('115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003')
OR (STRING_TO_ARRAY(cnae_secondary , ',') &&
ARRAY['115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003']);
"""
),
CNX.cnx,
)
df_cnpj_ccs["cnae"] = df_cnpj_ccs["cnae"].astype(str)
# Format float columns
pd.set_option("float_format", "{:f}".format)
### option 2 - 1. Use parquet file to load cnpj data
# df_cnpj_ccs = pd.read_parquet("/opt/shared/brazil_cnpj/cnpj.parquet")
# df_cnpj_ccs_ = df_cnpj_ccs.copy()
### option 2 - 2. Transform data stored in parquet file
# Create list with relevant cnae
# list_cnae = ['115600', '141501','163600','5211701', '5211799', '1041400', '1042200', '1069400', '4622200', '4623108', '4623199', '4632001', '4632002', '4632003']
# results = pd.DataFrame()
# Define the chunk size (number of rows per chunk)
# chunk_size = 10000
# Define a starting index
# start_index = 0
# Loop through the data in chunks
# while start_index < len(df_cnpj_ccs_parquet):
# Calculate the end index for the chunk
# end_index = min(start_index + chunk_size, len(df_cnpj_ccs_parquet))
# Get the chunk of data
# chunk = df_cnpj_ccs_parquet.iloc[start_index:end_index]
# fill NaN
# chunk['cnae_secondary'].fillna(0, inplace=True)
# split lists
# chunk['cnae_secondary'] = chunk['cnae_secondary'].str.split(',')
# explode lists
# chunk = chunk.explode('cnae_secondary', ignore_index=True)
# chunk['cnae'] = chunk['cnae'].astype(str)
# select relevant primary cnae
# mask_A = chunk['cnae'].isin(list_cnae)
# select relevant secondary cnae
# mask_B = chunk['cnae_secondary'].isin(list_cnae)
# select rows of interest in the dataset
# selected_rows = chunk[mask_A | mask_B]
# selected = pd.concat([mask_A, mask_B])
# append results
# results = results.append(selected_rows, ignore_index = True)
# Update the start index for the next chunk
# print(end_index)
# start_index = end_index
# check
len(df_cnpj_ccs)
# check
df_cnpj_ccs.isna().sum()
df_cnpj_ccs[df_cnpj_ccs.isna().any(axis=1)]
df_cnpj_ccs.info()
### 2- Classify by level category
# Levels ranges from 1 to 4 and are related to relevant primary cnae.
# Data not classified corresponds to rows with relevant secondary cnae (no relevant primary cnae), which have not been classified into levels categories.
# take a copy of the results
# valid_cnpj = results
# define function for the classification
def get_level_value(row):
if row in ["115600", "141501", "163600"]:
return "Level 1"
elif row in ["5211701", "5211799"]:
return "Level 2"
elif row in ["1041400", "1042200", "1069400"]:
return "Level 3"
elif row in ["4622200", "4623108", "4623199", "4632001", "4632002", "4632003"]:
return "Level 4"
else:
return "Data not classified"
# apply function to new column
df_cnpj_ccs["level"] = df_cnpj_ccs["cnae"].apply(get_level_value)
# check if sum of company name unique values is preserved
check_unique_values = df_cnpj_ccs["company_name"].nunique()
print(check_unique_values)
### 3 - Data aggregation - Level 1
# Bar chart 1 - companies_names by levels
rfb_group_1 = (
df_cnpj_ccs.groupby(["level"]).agg({"company_name": "nunique"}).reset_index()
)
# Bar Chart 2 - cnpj by levels
rfb_group_2 = df_cnpj_ccs.groupby(["level"]).agg({"cnpj": "nunique"}).reset_index()
### 4 - Data aggregation - Level 2
# secondary_cnae and cnpj by company
rfb_group_0 = (
df_cnpj_ccs.groupby(["company_name", "level", "cnae"])
.agg({"cnae_secondary": "nunique", "cnpj": "nunique"})
.reset_index()
)
### 5 - Create ranges column
# find bins thresholds
max_value_cnpj = rfb_group_0["cnpj"].max()
max_value_secCnae = rfb_group_0["cnae_secondary"].max()
print(max_value_cnpj, max_value_secCnae)
# Create ranges
bins_cnpj = [0, 1, 10, 50, 100, 200, 500, 800, 1000, 1040]
labels_cnpj = [
"1 cnpj",
"2-10 cnpj",
"11-50 cnpj",
"51-100 cnpj",
"101-200 cnpj",
"201-500 cnpj",
"501-800 cnpj",
"801-1000 cnpj",
"1001-1040 cnpj",
]
bins_scnae = [-1, 0, 1, 10, 20, 50, 80, 109, 136]
labels_scnae = [
"No secondary cnae",
"1 secondary cnae",
"2-10 secondary cnae",
"11-20 secondary cnae",
"21-50 secondary cnae",
"51-80 secondary cnae",
"81-109 secondary cnae",
"81-136 secondary cnae",
]
rfb_group_0["cnpj_ranges"] = pd.cut(rfb_group_0["cnpj"], bins_cnpj, labels=labels_cnpj)
rfb_group_0["cnae_secondary_ranges"] = pd.cut(
rfb_group_0["cnae_secondary"], bins_scnae, labels=labels_scnae, include_lowest=True
)
### 6 - Data aggregation - Level 3
# Bar chart 3 - companies_names by cnpj ranges and level
rfb_group_3 = (
rfb_group_0[["company_name", "level", "cnpj_ranges"]]
.groupby(["level", "cnpj_ranges"])
.agg({"company_name": "count"})
.reset_index()
)
# Bar chart 4 - companies_name by secondary cnae ranges and level
rfb_group_4 = (
rfb_group_0[["company_name", "level", "cnae_secondary_ranges"]]
.groupby(["level", "cnae_secondary_ranges"])
.agg({"company_name": "count"})
.reset_index()
)
### 7 - Bar charts
# Bar chart 1
fig = px.bar(
rfb_group_1,
x="level",
y="company_name",
title="Distribution of companies among levels of activity",
labels={"level": "Level of activity", "company_name": "Amount of companies"},
)
fig.update_layout(width=700, height=500)
fig.show("png")
# Bar chart 2
fig = px.bar(
rfb_group_2,
x="level",
y="cnpj",
title="Distribution of CNPJ among levels of activity",
labels={"level": "Level of activity", "cnpj": "Amount of CNPJ"},
)
fig.update_layout(width=700, height=500)
fig.show("png")
# Bar Chart 3
fig = px.bar(
rfb_group_3,
x="level",
y="company_name",
color="cnpj_ranges",
barmode="group",
title="Amount of cnpj holded by companies, by level of activity",
labels={
"level": "Level of activity",
"company_name": "Amount of companies",
"cnpj_ranges": "Cnpj ranges",
},
)
fig.update_layout(width=700, height=500)
# Show the plot
fig.show("png")
# Bar Chart 4
fig = px.bar(
rfb_group_4,
x="level",
y="company_name",
color="cnae_secondary_ranges",
barmode="group",
title="Amount of CNAE holded by companies, by level of activity",
labels={
"level": "Level of activity",
"company_name": "Amount of companies",
"cnae_secondary_ranges": "Secondary CNAE ranges",
},
)
fig.update_layout(width=700, height=500)
fig.show("png")
BOL dataset analysis
### 1 - Load BOL dataset
def load_data():
"""Load BOL data"""
df_bol = get_pandas_df_once(
"brazil/trade/bol/2022/BRAZIL_BOL_2022.csv",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
df_bol["vol"] = df_bol["vol"].astype(float)
return df_bol
df_bol = load_data()
# select BOL data subset
df_bol_subset = df_bol[["exporter.cnpj", "exporter.label", "hs4", "vol"]]
# remove duplicates
df_bol_subset = df_bol_subset.drop_duplicates()
len(df_bol_subset)
### 2 - Filter BOL data by hs4 codes of interest
# Extract distinct hs4 code from splitgraph.commodities
from trase.tools import uses_database
@uses_database
def get_hs4_codes(cnx=None):
hs4_codes = pd.read_sql(
"SELECT DISTINCT substr(hs_code, 0, 5) as hs4, commodity, hs_code FROM splitgraph.commodities",
con=cnx.cnx,
)
return hs4_codes
df_hs4 = get_hs4_codes()
df_hs4_c = df_hs4[df_hs4["commodity"] == "SOY"]
list_hs4_c = list(df_hs4_c["hs4"])
# Filter BOL dataset
df_bol_c = df_bol_subset[df_bol_subset["hs4"].isin(list_hs4_c)]
### 3 - Data aggregation - Level 1
# Group vol, hs4 and cnpj by exporter
bol_group_1 = (
df_bol_c.groupby(["exporter.label"])
.agg({"vol": "sum", "exporter.cnpj": "nunique", "hs4": "nunique"})
.reset_index()
)
### 5 - Create range column
# find bins thresholds
max_value_cnpj = bol_group_1["exporter.cnpj"].max()
max_value_hs4 = bol_group_1["hs4"].max()
print(max_value_cnpj, max_value_hs4)
bol_group_1 = bol_group_1.copy()
# create ranges
bins_cnpj = [0, 1, 10, 20]
labels_cnpj = ["1 cnpj", "2-10 cnpj", "11-20 cnpj"]
bins_sh4 = [0, 1, 2, 3, 4]
labels_sh4 = ["1 SH4 code", "2 SH4 codes", "3 SH4 codes", "4 SH4 codes"]
bol_group_1["cnpj_ranges"] = pd.cut(
bol_group_1["exporter.cnpj"], bins_cnpj, labels=labels_cnpj
)
bol_group_1["sh4_ranges"] = pd.cut(
bol_group_1["exporter.cnpj"], bins_sh4, labels=labels_sh4
)
### 6 - Make Bar charts
# Bar chart 1 - Group exporters by cnpj ranges
bol_group_2 = (
bol_group_1.groupby("cnpj_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar chart 1
fig = px.bar(
bol_group_2,
x="cnpj_ranges",
y="exporter.label",
barmode="group",
title="Amount of cnpj holded by companies",
labels={"cnpj_ranges": "CNPJ ranges", "exporter.label": "Amount of companies"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
# Bar Chart 2 - Group exporters by sh4
bol_group_3 = (
bol_group_1.groupby("sh4_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar Chart 2
fig = px.bar(
bol_group_3,
x="sh4_ranges",
y="exporter.label",
barmode="group",
title="Amount of SH4 codes holded by companies",
labels={"sh4_ranges": "SH4 Codes", "exporter.label": "Amount of companies"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
Comparison between datasets
### 1- drop duplicates
# inspect
df_cnpj_ccs.duplicated(subset=["cnpj"]).sum()
df_bol_c.duplicated(subset=["exporter.cnpj"]).sum()
# drop
bol_unique_s = bol_unique[["exporter.cnpj", "hs4"]]
unique_soy_cnpjs = bol_unique_s["exporter.cnpj"].drop_duplicates()
df = query_with_dataframe(
unique_soy_cnpjs.astype(int).rename("cnpj"),
"""
select cnpj, cnae, cnae_secondary
from df
left join (
select cnpj::bigint, cnae, cnae_secondary from cnpj.cnpj
) v using (cnpj)
""",
)
print(
"How many CNPJs from the BoL dataset matched with the CNPJ database:\n",
(~df["cnae"].isnull()).value_counts(),
)
df.head()
matched = df[~df["cnae"].isnull()]
matched["match"] = "matched"
unmatched = df[df["cnae"].isnull()]
unmatched["match"] = "unmatched"
df_concat = pd.concat([matched, unmatched], ignore_index=True)
### 2 - Transform dataset
# Create a new column named 'ID' with a range of values
df_concat["ID"] = range(1, len(df_concat) + 1)
### 3 - Make Bar Charts
# Bar chart 1 - Aggregation
comparison_group_1 = (
df_concat.groupby(["match"], dropna=False).agg({"ID": "count"}).reset_index()
)
# Bar Chart 1
fig = px.bar(
comparison_group_1,
x="match",
y="ID",
barmode="group",
title="Matching and unmatching values CNPJ between BOL and RFB datasets",
labels={"merge": "datasets", "ID": "Amount of matching CNPJ values"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
comparison_group_1.head()
# 4- Inspect bol data for entries that do not have matching values in the CNPJ database
unmatched_row = df_concat[df_concat["match"] == "unmatched"]
unmatched_row
### 1 - Load BOL dataset
def load_data():
"""Load BOL data"""
df_bol = get_pandas_df_once(
"brazil/trade/bol/2022/BRAZIL_BOL_2022.csv",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
)
df_bol["vol"] = df_bol["vol"].astype(float)
return df_bol
df_bol = load_data()
# select BOL data subset
df_bol_subset = df_bol[["exporter.cnpj", "exporter.label", "hs4", "vol"]]
# remove duplicates
df_bol_subset = df_bol_subset.drop_duplicates()
len(df_bol_subset)
### 2 - Filter BOL data by hs4 codes of interest
# Extract distinct hs4 code from splitgraph.commodities
from trase.tools import uses_database
@uses_database
def get_hs4_codes(cnx=None):
hs4_codes = pd.read_sql(
"SELECT DISTINCT substr(hs_code, 0, 5) as hs4, commodity, hs_code FROM splitgraph.commodities",
con=cnx.cnx,
)
return hs4_codes
df_hs4 = get_hs4_codes()
df_hs4_c = df_hs4[df_hs4["commodity"] == "SOY"]
list_hs4_c = list(df_hs4_c["hs4"])
# Filter BOL dataset
df_bol_c = df_bol_subset[df_bol_subset["hs4"].isin(list_hs4_c)]
### 3 - Data aggregation - Level 1
# Group vol, hs4 and cnpj by exporter
bol_group_1 = (
df_bol_c.groupby(["exporter.label"])
.agg({"vol": "sum", "exporter.cnpj": "nunique", "hs4": "nunique"})
.reset_index()
)
### 5 - Create range column
# find bins thresholds
max_value_cnpj = bol_group_1["exporter.cnpj"].max()
max_value_hs4 = bol_group_1["hs4"].max()
print(max_value_cnpj, max_value_hs4)
bol_group_1 = bol_group_1.copy()
# create ranges
bins_cnpj = [0, 1, 10, 20]
labels_cnpj = ["1 cnpj", "2-10 cnpj", "11-20 cnpj"]
bins_sh4 = [0, 1, 2, 3, 4]
labels_sh4 = ["1 SH4 code", "2 SH4 codes", "3 SH4 codes", "4 SH4 codes"]
bol_group_1["cnpj_ranges"] = pd.cut(
bol_group_1["exporter.cnpj"], bins_cnpj, labels=labels_cnpj
)
bol_group_1["sh4_ranges"] = pd.cut(
bol_group_1["exporter.cnpj"], bins_sh4, labels=labels_sh4
)
### 6 - Make Bar charts
# Bar chart 1 - Group exporters by cnpj ranges
bol_group_2 = (
bol_group_1.groupby("cnpj_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar chart 1
fig = px.bar(
bol_group_2,
x="cnpj_ranges",
y="exporter.label",
barmode="group",
title="Amount of cnpj holded by companies",
labels={"cnpj_ranges": "CNPJ ranges", "exporter.label": "Amount of companies"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
# Bar Chart 2 - Group exporters by sh4
bol_group_3 = (
bol_group_1.groupby("sh4_ranges").agg({"exporter.label": "count"}).reset_index()
)
# Bar Chart 2
fig = px.bar(
bol_group_3,
x="sh4_ranges",
y="exporter.label",
barmode="group",
title="Amount of SH4 codes holded by companies",
labels={"sh4_ranges": "SH4 Codes", "exporter.label": "Amount of companies"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
Comparison between datasets
### 1- drop duplicates
# inspect
df_cnpj_ccs.duplicated(subset=["cnpj"]).sum()
df_bol_c.duplicated(subset=["exporter.cnpj"]).sum()
# drop
bol_unique_s = bol_unique[["exporter.cnpj", "hs4"]]
unique_soy_cnpjs = bol_unique_s["exporter.cnpj"].drop_duplicates()
df = query_with_dataframe(
unique_soy_cnpjs.astype(int).rename("cnpj"),
"""
select cnpj, cnae, cnae_secondary
from df
left join (
select cnpj::bigint, cnae, cnae_secondary from cnpj.cnpj
) v using (cnpj)
""",
)
print(
"How many CNPJs from the BoL dataset matched with the CNPJ database:\n",
(~df["cnae"].isnull()).value_counts(),
)
df.head()
matched = df[~df["cnae"].isnull()]
matched["match"] = "matched"
unmatched = df[df["cnae"].isnull()]
unmatched["match"] = "unmatched"
df_concat = pd.concat([matched, unmatched], ignore_index=True)
### 2 - Transform dataset
# Create a new column named 'ID' with a range of values
df_concat["ID"] = range(1, len(df_concat) + 1)
### 3 - Make Bar Charts
# Bar chart 1 - Aggregation
comparison_group_1 = (
df_concat.groupby(["match"], dropna=False).agg({"ID": "count"}).reset_index()
)
# Bar Chart 1
fig = px.bar(
comparison_group_1,
x="match",
y="ID",
barmode="group",
title="Matching and unmatching values CNPJ between BOL and RFB datasets",
labels={"merge": "datasets", "ID": "Amount of matching CNPJ values"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
comparison_group_1.head()
# 4- Inspect bol data for entries that do not have matching values in the CNPJ database
unmatched_row = df_concat[df_concat["match"] == "unmatched"]
unmatched_row
### 1- drop duplicates
# inspect
df_cnpj_ccs.duplicated(subset=["cnpj"]).sum()
df_bol_c.duplicated(subset=["exporter.cnpj"]).sum()
# drop
bol_unique_s = bol_unique[["exporter.cnpj", "hs4"]]
unique_soy_cnpjs = bol_unique_s["exporter.cnpj"].drop_duplicates()
df = query_with_dataframe(
unique_soy_cnpjs.astype(int).rename("cnpj"),
"""
select cnpj, cnae, cnae_secondary
from df
left join (
select cnpj::bigint, cnae, cnae_secondary from cnpj.cnpj
) v using (cnpj)
""",
)
print(
"How many CNPJs from the BoL dataset matched with the CNPJ database:\n",
(~df["cnae"].isnull()).value_counts(),
)
df.head()
matched = df[~df["cnae"].isnull()]
matched["match"] = "matched"
unmatched = df[df["cnae"].isnull()]
unmatched["match"] = "unmatched"
df_concat = pd.concat([matched, unmatched], ignore_index=True)
### 2 - Transform dataset
# Create a new column named 'ID' with a range of values
df_concat["ID"] = range(1, len(df_concat) + 1)
### 3 - Make Bar Charts
# Bar chart 1 - Aggregation
comparison_group_1 = (
df_concat.groupby(["match"], dropna=False).agg({"ID": "count"}).reset_index()
)
# Bar Chart 1
fig = px.bar(
comparison_group_1,
x="match",
y="ID",
barmode="group",
title="Matching and unmatching values CNPJ between BOL and RFB datasets",
labels={"merge": "datasets", "ID": "Amount of matching CNPJ values"},
)
fig.update_layout(width=700, height=400)
fig.show("png")
comparison_group_1.head()
# 4- Inspect bol data for entries that do not have matching values in the CNPJ database
unmatched_row = df_concat[df_concat["match"] == "unmatched"]
unmatched_row