Sicarm Clean 2021
s3://trase-storage/brazil/logistics/sicarm/out/SICARM_CLEAN_2021.csv
Dbt path: trase_production.main_brazil.sicarm_clean_2021
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/logistics/sicarm/out/_schema.yml
Model file link: trase/data_pipeline/models/brazil/logistics/sicarm/out/sicarm_clean_2021.py
Calls script: trase/data/brazil/logistics/sicarm/archive/SICARM_CLEAN_AND_MISSING_CNPJ_2021.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, logistics, out, sicarm
sicarm_clean_2021
Description
Brazil SICARM This model was auto-generated based off .yml 'lineage' files in S3.
The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/sicarm/SICARM_CLEAN_AND_MISSING_CNPJ_2021.py [permalink]. It was last run by Harry Biddle. ## What the dataset is
SICARM (Sistema de Cadastro Nacional de Unidades Armazenadoras, or National Storage Registration System) is a system developed by CONAB (National Supply Company). It is a discretionary registration which allows the government to understand what is the availability for storing capacity every year, mostly to control and support the development of the industry in the country, for example controlling prices of the commodity. Data is added to the database based on the SNCUA system. This dataset is is linked to both SICAF and SIRCOI.
Since it is discretionary, SICARM does not provide a full list of storing facilities Having a facility in the municipality adds the municipality to the list of logistic hubs, but does not exclude those previously identified just because it doesn't exist in the registration. For further information the requirements for a facility to be added to the system are:
- Be a legal entity under public or private law.
- Have a Technical Manager (RT) in the states where the Regional Council of Engineering, Architecture and Agronomy (CREA) requires.
- Be located in an area with roads in good condition for traffic and normal access throughout the year.
- Keep the property (facilities, machinery and equipment) clean, well maintained and in perfect working order.
- Have a system to combat pest infestation in stored products, preferably using Integrated Pest Management (IPM) techniques.
- Do not share facilities, including maneuvering yards, with another company, regardless of the line of business.
- Do not share common-use equipment with warehouses located at other addresses or registered with another CNPJ.
Note: the status descredenciado (disqualified) does not exclude the existence of the facility. It only says that Conab will not consider this facility as part of the system for public purchases for example, but facility may still operates. One example that can lead to the situation is a disagreement in service price or missing papers on the update of the registration. In theory (as Brazil is always about in theory not in practice) Conab reviews and open public calls for new registrations often (almost every year), and some facility may subscribe or unsubscribe from the database.
Having the correct CNAE in the RFB dataset is a requirement for registration.
This dataset contains unique identifiers called CDAs (Cadastro de Armazém, or Warehouse Registry). One CNPJ can be associated with multiple CDAs.
More information on SICARM can be found in the Brazilian crop commodities review.
How we use the dataset in Trase
The dataset is used in the Brazil SEI-PCS crop models.
How often the dataset is updated
Uncertain
How to re-fetch the dataset from the original source
Queries can be made directly at the following link: https://consultaweb.conab.gov.br/consultas/consultaArmazem.do?method=acaoCarregarConsult Searches can be done per state as well as by certified and habilitated facilities to generate tables similar to the one below with a unique identifier for the asset (CDA - Código do Armazém) as well a geographic location and address. Accessing the site requires a Brazilian VPN.
We have a script which scrapes the data: trase/data/brazil/logistics/sicarm/in/scraping_sicarm_with_cnpj.R, also cleans the data.
The script that is used to process/clean the dataset
The same script, trase/data/brazil/logistics/sicarm/in/scraping_sicarm_with_cnpj.R, also cleans the data.
When the dataset was last updated, and by whom.
2021 extraction
The 2021 data was downloaded manually on 15th of July 2021 for each Brazilian state and pre-processed using the script trase/data/brazil/logistics/sicarm/br_sicarm_update_2017_2021.py.

2024 extraction
This download was done by Vivian, follows the same process of manually download the state date, but use the CDA (unique identifier) as a key to request the CNPJ - here is a screenshot of the process:

A history of changes/notes of the dataset.
None
Acceptance criteria for sufficient level of quality of the dataset.
Data obtained from SICARM should matche the IBGE communication about the total storage capacity in the country: https://agenciadenoticias.ibge.gov.br/agencia-sala-de-imprensa/2013-agencia-de-noticias/releases/38305-capacidade-de-armazenagem-agricola-cresce-4-8-e-chega-a-201-4-milhoes-de-toneladas-no-1-semestre-de-2023
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.sicarm_se_20210715source.trase_duckdb.trase-storage-raw.source.trase_duckdb.trase-storage-raw.sicarm_al_20210715source.trase_duckdb.trase-storage-raw.sicarm_go_20210715source.trase_duckdb.trase-storage-raw.sicarm_ms_20210715source.trase_duckdb.trase-storage-raw.sicarm_ma_20210715source.trase_duckdb.trase-storage-raw.sicarm_pi_20210715source.trase_duckdb.trase-storage-raw.sicarm_ce_20210715source.trase_duckdb.trase-storage-raw.sicarm_sp_20210715source.trase_duckdb.trase-storage-raw.sicarm_pr_20210715source.trase_duckdb.trase-storage-raw.sicarm_es_20210715source.trase_duckdb.trase-storage-raw.sicarm_df_20210715source.trase_duckdb.trase-storage-raw.sicarm_ba_20210715source.trase_duckdb.trase-storage-raw.sicarm_am_20210715source.trase_duckdb.trase-storage-raw.sicarm_ap_20210715source.trase_duckdb.trase-storage-raw.sicarm_cleansource.trase_duckdb.trase-storage-raw.sicarm_rj_20210715source.trase_duckdb.trase-storage-raw.sicarm_mg_20210715source.trase_duckdb.trase-storage-raw.sicarm_rr_20210715source.trase_duckdb.trase-storage-raw.sicarm_mt_20210715source.trase_duckdb.trase-storage-raw.sicarm_rn_20210715source.trase_duckdb.trase-storage-raw.sicarm_ac_20210715source.trase_duckdb.trase-storage-raw.sicarm_sc_20210715source.trase_duckdb.trase-storage-raw.auxiliary_cnpj_original_cnpj_2019source.trase_duckdb.trase-storage-raw.sicarm_rs_20210715source.trase_duckdb.trase-storage-raw.sicarm_pb_20210715source.trase_duckdb.trase-storage-raw.sicarm_to_20210715source.trase_duckdb.trase-storage-raw.sicarm_ro_20210715source.trase_duckdb.trase-storage-raw.ufsource.trase_duckdb.trase-storage-raw.sicarm_pa_20210715source.trase_duckdb.trase-storage-raw.sicarm_pe_20210715
Sources
['trase-storage-raw', 'sicarm_se_20210715']['trase-storage-raw', '']['trase-storage-raw', 'sicarm_al_20210715']['trase-storage-raw', 'sicarm_go_20210715']['trase-storage-raw', 'sicarm_ms_20210715']['trase-storage-raw', 'sicarm_ma_20210715']['trase-storage-raw', 'sicarm_pi_20210715']['trase-storage-raw', 'sicarm_ce_20210715']['trase-storage-raw', 'sicarm_sp_20210715']['trase-storage-raw', 'sicarm_pr_20210715']['trase-storage-raw', 'sicarm_es_20210715']['trase-storage-raw', 'sicarm_df_20210715']['trase-storage-raw', 'sicarm_ba_20210715']['trase-storage-raw', 'sicarm_am_20210715']['trase-storage-raw', 'sicarm_ap_20210715']['trase-storage-raw', 'sicarm_clean']['trase-storage-raw', 'sicarm_rj_20210715']['trase-storage-raw', 'sicarm_mg_20210715']['trase-storage-raw', 'sicarm_rr_20210715']['trase-storage-raw', 'sicarm_mt_20210715']['trase-storage-raw', 'sicarm_rn_20210715']['trase-storage-raw', 'sicarm_ac_20210715']['trase-storage-raw', 'sicarm_sc_20210715']['trase-storage-raw', 'auxiliary_cnpj_original_cnpj_2019']['trase-storage-raw', 'sicarm_rs_20210715']['trase-storage-raw', 'sicarm_pb_20210715']['trase-storage-raw', 'sicarm_to_20210715']['trase-storage-raw', 'sicarm_ro_20210715']['trase-storage-raw', 'uf']['trase-storage-raw', 'sicarm_pa_20210715']['trase-storage-raw', 'sicarm_pe_20210715']
"""
Brazil - SICARM (Sistema de Cadastro Nacional de Unidades Armazenadoras)
Update and organize the data
Fetch and match the CNPJs
"""
import posixpath
from tempfile import gettempdir
import numpy as np
import pandas as pd
import recordlinkage as rl
from recordlinkage.compare import Exact, String
from tqdm import tqdm
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import add_object_to_tracker
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers import (
read_s3_folder,
get_pandas_df,
)
from trase.tools.sei_pcs.pandas_utilities import full_merge, rename, concat
final_columns_order = [
"ID",
"CDA",
"CPF_CNPJ",
"COMPANY",
"STATUS",
"TYPE",
"CAPACITY",
"UF",
"MUNICIPALITY",
"ADDRESS",
"LAT",
"LONG",
"GEOCODE",
]
def main():
start_time = datetime.now()
print("Script started:", start_time)
print(80 * "-")
print(
"Step 1: Read new SICARM .csv files per State, get CNPJ data from old SICARM on S3 Bucket"
)
df_load = load_data()
df = insert_geocode(df_load)
df_sicarm = load_merge_old_version(df)
df_with_cnpj_step1, df_without_cnpj = split_sicarm_without_cnpj(df_sicarm)
print(80 * "-")
print("Step 2-3: Download and read CNPJ dataset file from S3")
cnpj = read_cnpj(download(download_cnpj_csv()))
print(80 * "-")
print("Step 4: Merge:: SICARM and CNPJ by COMPANY and CEP (zipcode)")
df_with_cnpj_step4, df_without_cnpj = sicarm_merge_cnpj_company_zipcode(
df_without_cnpj, cnpj
)
print(80 * "-")
print("Step 5: Merge:: SICARM and CNPJ by COMPANY and GEOCODE")
df_with_cnpj_step5, df_without_cnpj = sicarm_merge_cnpj_company_geocode(
df_without_cnpj, cnpj
)
print(80 * "-")
print("Step 6: RecordLinkage:: SICARM and CNPJ by COMPANY and ADDRESS")
df_with_cnpj_step6, df_without_cnpj = sicarm_recordlinkage_cnpj_address(
df_without_cnpj, cnpj
)
print(80 * "-")
print("Step 7: Merge:: SICARM and CNPJ by COMPANY and STATE")
df_with_cnpj_step7, df_without_cnpj = sicarm_merge_cnpj_state(df_without_cnpj, cnpj)
print(80 * "-")
print("Step 8: RecordLinkage:: SICARM and SICASQ by COMPANY and ADDRESS")
df_with_cnpj_step8, df_without_cnpj = sicarm_recordlinkage_sicasq(df_without_cnpj)
print(80 * "-")
print("Step 9: RecordLinkage:: SICARM and BoL by COMPANY")
df_with_cnpj_step9 = sicarm_recordlinkage_bol(df_without_cnpj)
print(80 * "-")
print("Step 10: Manually-Searched:: Insert Googled CNPJs into SICARM")
df_with_cnpj_step10 = load_manually_searched_cnpjs()
print(80 * "-")
print("Step 11: Concatenate SICARM with and without CNPJ information")
df_full_data, df_without_cnpj = final_concatenation(
df_sicarm,
df_with_cnpj_step1,
df_with_cnpj_step4,
df_with_cnpj_step5,
df_with_cnpj_step6,
df_with_cnpj_step7,
df_with_cnpj_step8,
df_with_cnpj_step9,
df_with_cnpj_step10,
)
print(80 * "-")
print("Export both to temp file and S3 Bucket")
write_csv_for_upload(
df_full_data, "brazil/logistics/sicarm/out/SICARM_CLEAN_2021.csv"
)
write_csv_for_upload(
df_without_cnpj, "brazil/logistics/sicarm/in/SICARM_CLEAN_MISSING_CNPJ_2021.csv"
)
print("Script took ", datetime.now() - start_time, "to run")
def normalize_str(d: pd.DataFrame, col: str, clean=False):
"""
Adjust column value characters encoding to UTF-8 and uppercase them.
Args:
d (pandas DataFrame): Dataframe to lookup
col (str): String column
clean: remove specific characters
Returns:
pandas DataFrame
"""
d[col] = (
d[col]
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf-8")
)
d[col] = d[col].str.upper()
if clean is True:
d[col] = (
d[col]
.str.replace(".", "")
.str.replace("-", "")
.str.replace("/", "")
.str.replace(",", "")
.str.replace('"', "")
.str.split(" ")
.str.join("")
)
else:
d[col] = d[col]
return d
def get_state_uf():
df = get_pandas_df(
"brazil/metadata/UF.csv",
sep=",",
usecols=("CO_UF_IBGE", "CO_UF", "UF"),
dtype=str,
)
df = rename(df, columns={"CO_UF_IBGE": "state.code", "CO_UF": "state.uf_number"})
return df
def replace_state_uf_codes_with_names_and_trase_ids(df, column):
df_state_uf = get_state_uf()[[column, "state.code"]].drop_duplicates()
return full_merge(df, df_state_uf, on=column, how="left", validate="many_to_one")
# ==================================================================================== #
# STEP 1: Read all original SICARM .csv files per State
#
# (1) - read csv from S3 bucket and concatenate them,
# (2) - merge with States csv: fetch GEOCODE variable,
# (3) - merge with current SICARM version,
# (4) - split into two dataframes:
# one with matched CNPJ and
# other to be merged with the CNPJ dataset
# ==================================================================================== #
def load_data():
"""
Load original data (.csv) from S3 bucket
Rename column names
Concatenate each .csv (per State) into a country-level DataFrame
"""
s3_folder = read_s3_folder("brazil/logistics/sicarm/ori/")
col_names = [
"CDA",
"Armazenador",
"Endereço",
"Município",
"UF",
"Tipo",
"Telefone",
"Email",
"Capacidade (t)",
"Latitude",
"Longitude",
]
# Read each .csv file from S3 Bucket
df_list = []
for file in s3_folder:
raw_data = get_pandas_df(
file.key,
sep=";",
encoding="latin1",
skiprows=1,
names=col_names,
dtype=str,
keep_default_na=False,
)
raw_data = raw_data.iloc[1:]
raw_data = rename(
raw_data,
columns={
"Armazenador": "COMPANY",
"Endereço": "ADDRESS",
"Município": "MUNICIPALITY",
"UF": "UF",
"Tipo": "TYPE",
"Capacidade (t)": "CAPACITY",
"Latitude": "LAT",
"Longitude": "LONG",
},
)
raw_data = normalize_str(raw_data, "MUNICIPALITY")
df_list.append(raw_data)
# Concatenate DataFrames
df = concat(df_list)
df["ID"] = np.arange(df.shape[0])
print("New SICARM - shape:", df.shape[0])
return df
@uses_database
def get_geocode(cnx=None):
br_geocode = pd.read_sql(
""" SELECT SUBSTRING(trase_id, 4, 11) as "GEOCODE",
UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4, 2) name_id
FROM views.regions
WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
cnx.cnx,
)
return br_geocode
def insert_geocode(df: pd.DataFrame):
"""
Insert Geocode into SICARM, based on State ("UF") information
Args:
df (pandas Dataframe): Dataframe where Geocode will be inserted
Return:
df_2 (pandas Dataframe): Dataframe with Geocode numeric values
"""
data = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
data["MUN_UF"] = data["MUNICIPALITY"] + " - " + data["state.code"]
df = full_merge(
data,
get_geocode(),
how="left",
left_on="MUN_UF",
right_on="name_id",
validate="many_to_one",
)
df = df.drop(columns={"name_id", "MUN_UF", "state.code"}, errors="raise")
return df
def load_merge_old_version(df: pd.DataFrame):
"""Load old SICARM version and merge with the new one to get it's CNPJ information"""
# Load SICARM Current Version
df_current = get_pandas_df(
"brazil/logistics/sicarm/out/SICARM_CLEAN.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
df_current = rename(
df_current,
columns={
"ADDRESS": "address",
"CAPACITY": "capacity",
"MUNICIPALITY": "municipality",
"LAT": "lat",
"LONG": "long",
"TYPE": "type",
},
)
# Join Current SICARM dataset with new one
df = df.merge(df_current, how="left", on=["CDA", "COMPANY", "GEOCODE", "UF"])
df = df[final_columns_order]
df = normalize_str(df, "CPF_CNPJ", clean=True)
return df
def split_sicarm_without_cnpj(df: pd.DataFrame):
"""Split SICARM with and without CNPJ data"""
sicarm_cnpj = df.loc[~df["CPF_CNPJ"].isna()]
print("SICARM with CNPJ - shape:", sicarm_cnpj.shape[0])
sicarm_nan_cnpj = df.loc[df["CPF_CNPJ"].isna()].copy()
print("SICARM NaN CNPJ - shape:", sicarm_nan_cnpj.shape[0])
return sicarm_cnpj, sicarm_nan_cnpj
# ==================================================================================== #
# STEP 2: Download CNPJ file from S3
#
# The files are downloaded to a temporary location on the operating system. For
# ease-of-use the files are only downloaded once.
# ==================================================================================== #
def download(key) -> str:
"""Downloads a file, but only once"""
client = aws_session().client("s3")
filename = posixpath.basename(key)
destination = Path(gettempdir(), filename)
if destination.is_file():
print(f"{destination} exists: skipping download")
else:
print(f"Downloading s3://trase-storage/{key} to {destination}")
client.download_file("trase-storage", key, str(destination))
add_object_to_tracker(key, "trase-storage", client=client)
return str(destination)
def download_cnpj_csv():
"""Download CNPJ 2019 file"""
return download("brazil/logistics/cnpj/original/CNPJ_2019.csv")
# ==================================================================================== #
# STEP 3: Read the CNPJ dataset.
#
# This contains three columns that we are interested in:
#
# (1) cnpj,
# (2) the company name,
# (3) geocode of the municipality,
# (4) address.
#
# This file is huge because it contains all companies in the whole of Brazil. We need to
# read pretty much the whole file, since it will serve as a lookup for the company name
# and geographic location. Because of its size we stream through the file and only load
# an absolute minimum of what we need into memory.
# ==================================================================================== #
def read_cnpj(csv):
"""Load CNPJ dataset into memory with specific columns"""
columns_to_keep = [
"cnpj",
"geocodmun",
"razao_social",
"situacao_cadastral",
"logradouro",
"numero",
"complemento",
"bairro",
"cep",
]
total_rows = 40_837_398
chunksize = 10_000
with open(csv, "r", errors="replace") as file:
chunk_iterator = pd.read_csv(
file,
sep=";",
dtype=str,
keep_default_na=False,
iterator=True,
chunksize=chunksize,
usecols=[
*columns_to_keep,
"municipio",
"data_situacao_cadastral",
"situacao_cadastral",
],
)
def filtered_chunk_iterator():
for d in tqdm(
chunk_iterator, desc="Loading CNPJ_2019", total=total_rows // chunksize
):
# ignore rows that are outside of Brazil
d = d[d["municipio"] != "EXTERIOR"]
# ignore NA and corrupt tax codes
d = d[d["cnpj"].str.isdigit()]
if d.empty:
continue
# cast CNAE and CNPJ to integers so we don't have leading zero woes when
# using them as a lookup/join keys later
d["cnpj_length"] = d["cnpj"].str.len()
d["cnpj"] = d["cnpj"].to_numpy(dtype="int64")
d["ADDRESS"] = (
d["logradouro"]
+ " "
+ d["numero"]
+ " "
+ d["complemento"]
+ " "
+ d["bairro"]
+ " "
+ d["cep"]
)
# split the dataframe into those where the date is defined
# and those where it is not
has_date = d["data_situacao_cadastral"] != "NA"
e = d[has_date]
f = d[~has_date]
# if a date is defined, discard old CNPJs that no longer exist
is_old = e["data_situacao_cadastral"].str.slice(0, 4).astype(int) < 2015
is_class_08 = e["situacao_cadastral"] == "08"
is_probably_inactive = is_old & is_class_08
e = e[~is_probably_inactive]
yield e[[*columns_to_keep, "ADDRESS", "cnpj_length"]]
yield f[[*columns_to_keep, "ADDRESS", "cnpj_length"]]
# concatenate the filtered chunk dataframes. it is only at this point that the file
# is actually read from disk
cnpj = pd.concat(filtered_chunk_iterator())
cnpj = rename(
cnpj, columns={"razao_social": "COMPANY", "geocodmun": "GEOCODE", "cep": "CEP"}
)
cnpj["ADDRESS"] = cnpj["ADDRESS"].replace(
dict.fromkeys(["NANANANA", "NA NA NA NA NA"], np.nan)
)
return cnpj
# ==================================================================================== #
# STEP 4: Merge SICARM with the CNPJ dataset - using companies' name and zipcode
#
# (1) Merge SICARM without CNPJ [from STEP 1] with the CNPJ dataset
# (2) Split SICARM merged with CNPJ:
# (2.1) With CNPJ: will merged in the end with other Sicarm DataFrames with CNPJ
# (2.2) Without CNPJ: will be used to fuzzy with the CNPJ dataset using different columns
# ==================================================================================== #
def sicarm_merge_cnpj_company_zipcode(df: pd.DataFrame, cnpj: pd.DataFrame):
"""Merge SICARM with CNPJ dataset by COMPANY and CEP (zipcode)"""
df["CEP"] = (
df["ADDRESS"]
.str[-10:]
.str.replace("-", "")
.str.replace(".", "")
.str.lstrip()
.str[:5]
)
cnpj["CEP"] = cnpj["CEP"].str[:5]
sicarm_cnpj = df.merge(
cnpj, how="left", on=["COMPANY", "CEP"], validate="many_to_many"
)
sicarm_cnpj = rename(
sicarm_cnpj,
columns={
"CPF_CNPJ": "CPF_CNPJ_NAN",
"cnpj": "CPF_CNPJ",
"GEOCODE_x": "GEOCODE",
"ADDRESS_x": "ADDRESS",
},
)
sicarm_cnpj = sicarm_cnpj[final_columns_order]
df_with_cnpj = sicarm_cnpj.loc[~sicarm_cnpj["CPF_CNPJ"].isna()].copy()
df_with_cnpj = df_with_cnpj.drop_duplicates(
subset=["CDA", "COMPANY", "ADDRESS", "TYPE"]
)
df_without_cnpj = sicarm_cnpj.loc[sicarm_cnpj["CPF_CNPJ"].isna()].copy()
print("With CNPJ:", df_with_cnpj.shape[0])
print("Missing:", df_without_cnpj.shape[0])
return df_with_cnpj, df_without_cnpj
# ==================================================================================== #
# STEP 5: Merge SICARM with CNPJs - using companies' names and geocode
#
# (1) Merge SICARM without CNPJ (Step 4) with CNPJ dataset using company and geocode
# (2) Split Merged data into two dataframes:
# (2.1) Without address: considered the Geocode as the closest info to fetch the CNPJ
# (2.2) With address: go fuzzy in the STEP 6
# ==================================================================================== #
def sicarm_merge_cnpj_company_geocode(df: pd.DataFrame, cnpj: pd.DataFrame):
"""Merge SICARM with CNPJ dataset by COMPANY and GEOCODE"""
df = normalize_str(df, "COMPANY")
df["ADDRESS"] = df["ADDRESS"].str[:-10]
df = normalize_str(df, "ADDRESS")
cnpj = normalize_str(cnpj, "COMPANY")
df = df.merge(cnpj, how="left", on=["COMPANY", "GEOCODE"], validate="many_to_many")
df = df.drop_duplicates(subset=["CDA", "COMPANY"])
df = rename(
df, columns={"cnpj": "CPF_CNPJ", "CPF_CNPJ": "CNPJ", "ADDRESS_x": "ADDRESS"}
)
df = df[final_columns_order]
has_cnpj = ~df["CPF_CNPJ"].isna()
df_with_cnpj = df[has_cnpj].copy()
df_without_cnpj = df[~df["ID"].isin(df_with_cnpj["ID"])].copy()
print("With CNPJ:", df_with_cnpj.shape[0])
print("Missing - Go RecordLinkage:", df_without_cnpj.shape[0])
return df_with_cnpj, df_without_cnpj
# ==================================================================================== #
# STEP 6: RecordLinkage between SICARM and CNPJ [Address]
# ==================================================================================== #
def sicarm_recordlinkage_cnpj_address(df: pd.DataFrame, cnpj: pd.DataFrame):
"""RecordLinkage between SICARM and CNPJ dataset:
Blocking on Geocode and Comparing Company and Address"""
df = df.set_index("ID")
cnpj["ID"] = np.arange(cnpj.shape[0])
cnpj = cnpj.set_index("ID")
indexer = rl.Index()
indexer.block(on="GEOCODE")
candidates = indexer.index(df, cnpj)
compare = rl.Compare()
compare.string(
"COMPANY", "COMPANY", threshold=0.8, method="levenshtein", label="empresa"
)
compare.string(
"ADDRESS", "ADDRESS", threshold=0.5, method="levenshtein", label="endereco"
)
features = compare.compute(candidates, df, cnpj)
features["score"] = features["empresa"] + features["endereco"]
features = features[features["score"] >= 1].reset_index()
features = features.sort_values("score", ascending=False)
features = features.drop_duplicates(subset="ID_1")
df = df.reset_index()
cnpj = cnpj.reset_index()
df = df[final_columns_order]
cnpj = cnpj[["ID", "cnpj", "ADDRESS", "COMPANY"]]
sicarm_merge = features.merge(df, how="left", left_on="ID_1", right_on="ID")
sicarm_merge = sicarm_merge.merge(
cnpj, how="left", left_on="ID_2", right_on="ID", suffixes=["_sicarm", "_cnpj"]
)
sicarm_merge = rename(sicarm_merge, columns={"ID_sicarm": "ID", "cnpj": "CPF_CNPJ"})
with_cnpj = sicarm_merge[sicarm_merge["score"] == 2]
# If Company name is equal in SICARM and CNPJ, they are in the same Geocode, but there's no
# address info in the CNPJ dataset, the Geocode will be considered the most reliable information
# to attribute the CNPJ to the company
same_company = sicarm_merge[
(sicarm_merge["empresa"] == 1)
& (sicarm_merge["endereco"] == 0)
& (sicarm_merge["ADDRESS_cnpj"].isna())
].copy()
df_with_cnpj = concat([with_cnpj, same_company])
df_with_cnpj = rename(
df_with_cnpj, {"COMPANY_sicarm": "COMPANY", "ADDRESS_sicarm": "ADDRESS"}
)
df_with_cnpj = df_with_cnpj[final_columns_order]
df_without_cnpj = df.loc[~df["ID"].isin(df_with_cnpj["ID"])].copy()
df_without_cnpj["CPF_CNPJ"] = ""
df_without_cnpj = df_without_cnpj[final_columns_order]
print("RecordLinkage: == Company Name & Address:", with_cnpj.shape[0])
print("RecordLinkage: == Company Name & NaN Address:", same_company.shape[0])
print("Missing:", df_without_cnpj.shape[0])
return df_with_cnpj, df_without_cnpj
# ==================================================================================== #
# STEP 7: Merge between SICARM and CNPJ [State]
# ==================================================================================== #
def sicarm_merge_cnpj_state(df: pd.DataFrame, cnpj: pd.DataFrame):
"""Merge SICARM with CNPJ dataset - companies and state"""
cnpj["UF_code"] = cnpj["GEOCODE"].str[:2]
df["UF_code"] = df["GEOCODE"].str[:2]
df = df.merge(cnpj, how="left", on=["COMPANY", "UF_code"], validate="many_to_many")
df.drop_duplicates(subset=["CDA", "COMPANY"], inplace=True)
df = rename(
df,
columns={
"CPF_CNPJ": "CPF_CNPJ_NAN",
"cnpj": "CPF_CNPJ",
"ADDRESS_x": "ADDRESS",
"GEOCODE_x": "GEOCODE",
"ID_x": "ID",
},
)
df = df[final_columns_order]
with_cnpj = df[~df["CPF_CNPJ"].isna()].copy()
without_cnpj = df[df["CPF_CNPJ"].isna()].copy()
print("Merge: with CNPJ:", with_cnpj.shape[0])
print("Missing:", without_cnpj.shape[0])
return with_cnpj, without_cnpj
# ==================================================================================== #
# STEP 8: Record Linkage between SICARM and SICASQ
# ==================================================================================== #
def sicarm_recordlinkage_sicasq(df: pd.DataFrame):
"""RecordLinkage between SICARM and SICASQ:
Blocking on State and Comparing Company and Address"""
sicasq_2017 = get_pandas_df_once(
"brazil/logistics/sicasq/SICASQ_2017.csv",
sep=";",
dtype=str,
keep_default_na=False,
encoding="LATIN1",
usecols=["COMPANY", "CNPJ", "ADDRESS", "UF"],
)
sicasq_2021 = get_pandas_df_once(
"brazil/logistics/sicasq/out/SICASQ_2021.csv",
sep=";",
dtype=str,
keep_default_na=False,
usecols=["COMPANY", "CNPJ", "ADDRESS", "UF"],
)
sicasq = concat([sicasq_2017, sicasq_2021])
sicasq = normalize_str(sicasq, "COMPANY")
sicasq = normalize_str(sicasq, "ADDRESS")
df = df.set_index("ID")
sicasq["ID"] = np.arange(sicasq.shape[0])
sicasq = sicasq.set_index("ID")
indexer = rl.Index()
indexer.sortedneighbourhood(on="COMPANY", window=9, block_on="UF")
indexer.sortedneighbourhood(on="ADDRESS", window=9, block_on="UF")
candidates = indexer.index(df, sicasq)
compare = rl.Compare(
[
String(
left_on="COMPANY",
right_on="COMPANY",
threshold=0.8,
method="levenshtein",
label="empresa",
),
String(
left_on="ADDRESS",
right_on="ADDRESS",
threshold=0.5,
method="levenshtein",
label="endereco",
),
Exact(left_on="UF", right_on="UF", label="estado"),
]
)
features = compare.compute(candidates, df, sicasq)
features = features[
(features["empresa"] == 1) & (features["estado"] == 1)
].reset_index()
features = features.drop_duplicates(subset="ID_1")
features = features.merge(df, how="left", left_on="ID_1", right_on="ID")
features = features.merge(
sicasq,
how="left",
left_on="ID_2",
right_on="ID",
suffixes=["_sicarm", "_sicasq"],
)
features = features.drop_duplicates()
features = features.rename(
columns={
"ID_1": "ID",
"COMPANY_sicarm": "COMPANY",
"CPF_CNPJ": "nan",
"CNPJ": "CPF_CNPJ",
"UF_sicarm": "UF",
"ADDRESS_sicarm": "ADDRESS",
}
)
with_cnpj = features[final_columns_order]
df = df.reset_index("ID")
without_cnpj = df[~df["ID"].isin(with_cnpj["ID"])]
with_cnpj = normalize_str(with_cnpj, "CPF_CNPJ", clean=True)
print("With CNPJ:", with_cnpj.shape[0])
print("Missing:", without_cnpj.shape[0])
return with_cnpj, without_cnpj
# ==================================================================================== #
# STEP 9: Record Linkage between SICARM and BoL
# ==================================================================================== #
def sicarm_recordlinkage_bol(df: pd.DataFrame):
"""RecordLinkage between SICARM and BoL 2018, 2019 and 2020:
Blocking and Comparing Company"""
df_bol = pd.concat(
[
get_pandas_df_once(
key,
sep=";",
dtype=str,
keep_default_na=False,
usecols=["exporter.label", "exporter.cnpj"],
)
for key in [
"brazil/trade/bol/2018/BRAZIL_BOL_DATALINER_SOY_2018.csv",
"brazil/trade/bol/2019/BRAZIL_BOL_2019.csv",
"brazil/trade/bol/2020/BRAZIL_BOL_2020.csv",
]
],
sort=False,
ignore_index=True,
)
df_bol = df_bol.rename(columns={"exporter.label": "COMPANY"}, errors="raise")
df_bol = normalize_str(df_bol, "COMPANY")
df_bol["ID"] = np.arange(df_bol.shape[0])
df_bol = df_bol.set_index("ID")
df = df.set_index("ID")
indexer = rl.Index()
indexer.sortedneighbourhood("COMPANY", window=9)
candidates = indexer.index(df, df_bol)
compare = rl.Compare()
compare.string(
"COMPANY",
"COMPANY",
threshold=0.8,
method="levenshtein",
label="empresa",
)
features = compare.compute(candidates, df, df_bol)
features = features[features["empresa"] == 1].reset_index()
features = features.drop_duplicates(subset="ID_1")
features = features.merge(df, how="left", left_on="ID_1", right_on="ID")
features = features.merge(df_bol, how="left", left_on="ID_2", right_on="ID")
features = rename(
features,
columns={
"ID_1": "ID",
"CPF_CNPJ": "NAD",
"exporter.cnpj": "CPF_CNPJ",
"COMPANY_x": "COMPANY",
},
)
different_location = concat(
e
for _, e in features.groupby(["CPF_CNPJ", "COMPANY"])
if e["MUNICIPALITY"].nunique() > 1
)
features = features[~features["CPF_CNPJ"].isin(different_location["CPF_CNPJ"])]
with_cnpj = features[final_columns_order]
print("With CNPJ:", with_cnpj.shape[0])
return with_cnpj
# ==================================================================================== #
# STEP 10: Insert manually-searched CNPJs on Google
# ==================================================================================== #
def load_manually_searched_cnpjs():
"""STEP 10: Insert manually-searched CNPJs on Google"""
df = get_pandas_df_once(
"brazil/logistics/sicarm/in/aux_sicarm_53_cnpjs_googled.csv",
sep=";",
dtype=str,
keep_default_na=False,
encoding="utf-8",
)
df = normalize_str(df, "CPF_CNPJ", clean=True)
print("Manually inserted CNPJS:", df.shape[0])
return df
# ==================================================================================== #
# STEP 11: Concatenate all previous steps and export
#
# Concatenate SICARM from Steps 1, 4, 5, 6, 7, 8, 9, and 10
# ==================================================================================== #
def final_concatenation(
df_sicarm: pd.DataFrame,
df_step1: pd.DataFrame,
df_step4: pd.DataFrame,
df_step5: pd.DataFrame,
df_step6: pd.DataFrame,
df_step7: pd.DataFrame,
df_step8: pd.DataFrame,
df_step9: pd.DataFrame,
df_step10: pd.DataFrame,
):
"""Step 11: Concatenate all previous steps"""
df = concat(
[
df_step1,
df_step4,
df_step5,
df_step6,
df_step7,
df_step8,
df_step9,
df_step10,
]
)
df = df[final_columns_order]
missing = df_sicarm[~df_sicarm["ID"].isin(df["ID"])]
full_data = concat([df, missing])
print("Final Full dataset:", full_data.shape[0])
print(
"Final with CNPJ:",
df.shape[0],
" - ",
f"{((df.shape[0] / full_data.shape[0])*100):.2f}",
"%",
)
print(
"Final without CNPJ:",
missing.shape[0],
" - ",
f"{((missing.shape[0] / full_data.shape[0])*100):.2f}",
"%",
)
return full_data, missing
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "sicarm_se_20210715")
dbt.source("trase-storage-raw", "")
dbt.source("trase-storage-raw", "sicarm_al_20210715")
dbt.source("trase-storage-raw", "sicarm_go_20210715")
dbt.source("trase-storage-raw", "sicarm_ms_20210715")
dbt.source("trase-storage-raw", "sicarm_ma_20210715")
dbt.source("trase-storage-raw", "sicarm_pi_20210715")
dbt.source("trase-storage-raw", "sicarm_ce_20210715")
dbt.source("trase-storage-raw", "sicarm_sp_20210715")
dbt.source("trase-storage-raw", "sicarm_pr_20210715")
dbt.source("trase-storage-raw", "sicarm_es_20210715")
dbt.source("trase-storage-raw", "sicarm_df_20210715")
dbt.source("trase-storage-raw", "sicarm_ba_20210715")
dbt.source("trase-storage-raw", "sicarm_am_20210715")
dbt.source("trase-storage-raw", "sicarm_ap_20210715")
dbt.source("trase-storage-raw", "sicarm_clean")
dbt.source("trase-storage-raw", "sicarm_rj_20210715")
dbt.source("trase-storage-raw", "sicarm_mg_20210715")
dbt.source("trase-storage-raw", "sicarm_rr_20210715")
dbt.source("trase-storage-raw", "sicarm_mt_20210715")
dbt.source("trase-storage-raw", "sicarm_rn_20210715")
dbt.source("trase-storage-raw", "sicarm_ac_20210715")
dbt.source("trase-storage-raw", "sicarm_sc_20210715")
dbt.source("trase-storage-raw", "auxiliary_cnpj_original_cnpj_2019")
dbt.source("trase-storage-raw", "sicarm_rs_20210715")
dbt.source("trase-storage-raw", "sicarm_pb_20210715")
dbt.source("trase-storage-raw", "sicarm_to_20210715")
dbt.source("trase-storage-raw", "sicarm_ro_20210715")
dbt.source("trase-storage-raw", "uf")
dbt.source("trase-storage-raw", "sicarm_pa_20210715")
dbt.source("trase-storage-raw", "sicarm_pe_20210715")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})