Sicarm Clean Missing Cnpj 2021
s3://trase-storage/brazil/logistics/sicarm/in/SICARM_CLEAN_MISSING_CNPJ_2021.csv
Dbt path: trase_production.main_brazil.sicarm_clean_missing_cnpj_2021
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/logistics/sicarm/in/_schema.yml
Model file link: trase/data_pipeline/models/brazil/logistics/sicarm/in/sicarm_clean_missing_cnpj_2021.py
Calls script: trase/data/brazil/logistics/sicarm/archive/SICARM_CLEAN_AND_MISSING_CNPJ_2021.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, in, logistics, sicarm
sicarm_clean_missing_cnpj_2021
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/sicarm/SICARM_CLEAN_AND_MISSING_CNPJ_2021.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.sicarm_se_20210715source.trase_duckdb.trase-storage-raw.source.trase_duckdb.trase-storage-raw.sicarm_al_20210715source.trase_duckdb.trase-storage-raw.sicarm_go_20210715source.trase_duckdb.trase-storage-raw.sicarm_ms_20210715source.trase_duckdb.trase-storage-raw.sicarm_ma_20210715source.trase_duckdb.trase-storage-raw.sicarm_pi_20210715source.trase_duckdb.trase-storage-raw.sicarm_ce_20210715source.trase_duckdb.trase-storage-raw.sicarm_sp_20210715source.trase_duckdb.trase-storage-raw.sicarm_pr_20210715source.trase_duckdb.trase-storage-raw.sicarm_es_20210715source.trase_duckdb.trase-storage-raw.sicarm_df_20210715source.trase_duckdb.trase-storage-raw.sicarm_ba_20210715source.trase_duckdb.trase-storage-raw.sicarm_am_20210715source.trase_duckdb.trase-storage-raw.sicarm_ap_20210715source.trase_duckdb.trase-storage-raw.sicarm_cleansource.trase_duckdb.trase-storage-raw.sicarm_rj_20210715source.trase_duckdb.trase-storage-raw.sicarm_mg_20210715source.trase_duckdb.trase-storage-raw.sicarm_rr_20210715source.trase_duckdb.trase-storage-raw.sicarm_mt_20210715source.trase_duckdb.trase-storage-raw.sicarm_rn_20210715source.trase_duckdb.trase-storage-raw.sicarm_ac_20210715source.trase_duckdb.trase-storage-raw.sicarm_sc_20210715source.trase_duckdb.trase-storage-raw.auxiliary_cnpj_original_cnpj_2019source.trase_duckdb.trase-storage-raw.sicarm_rs_20210715source.trase_duckdb.trase-storage-raw.sicarm_pb_20210715source.trase_duckdb.trase-storage-raw.sicarm_to_20210715source.trase_duckdb.trase-storage-raw.sicarm_ro_20210715source.trase_duckdb.trase-storage-raw.ufsource.trase_duckdb.trase-storage-raw.sicarm_pa_20210715source.trase_duckdb.trase-storage-raw.sicarm_pe_20210715
Sources
['trase-storage-raw', 'sicarm_se_20210715']['trase-storage-raw', '']['trase-storage-raw', 'sicarm_al_20210715']['trase-storage-raw', 'sicarm_go_20210715']['trase-storage-raw', 'sicarm_ms_20210715']['trase-storage-raw', 'sicarm_ma_20210715']['trase-storage-raw', 'sicarm_pi_20210715']['trase-storage-raw', 'sicarm_ce_20210715']['trase-storage-raw', 'sicarm_sp_20210715']['trase-storage-raw', 'sicarm_pr_20210715']['trase-storage-raw', 'sicarm_es_20210715']['trase-storage-raw', 'sicarm_df_20210715']['trase-storage-raw', 'sicarm_ba_20210715']['trase-storage-raw', 'sicarm_am_20210715']['trase-storage-raw', 'sicarm_ap_20210715']['trase-storage-raw', 'sicarm_clean']['trase-storage-raw', 'sicarm_rj_20210715']['trase-storage-raw', 'sicarm_mg_20210715']['trase-storage-raw', 'sicarm_rr_20210715']['trase-storage-raw', 'sicarm_mt_20210715']['trase-storage-raw', 'sicarm_rn_20210715']['trase-storage-raw', 'sicarm_ac_20210715']['trase-storage-raw', 'sicarm_sc_20210715']['trase-storage-raw', 'auxiliary_cnpj_original_cnpj_2019']['trase-storage-raw', 'sicarm_rs_20210715']['trase-storage-raw', 'sicarm_pb_20210715']['trase-storage-raw', 'sicarm_to_20210715']['trase-storage-raw', 'sicarm_ro_20210715']['trase-storage-raw', 'uf']['trase-storage-raw', 'sicarm_pa_20210715']['trase-storage-raw', 'sicarm_pe_20210715']
"""
Brazil - SICARM (Sistema de Cadastro Nacional de Unidades Armazenadoras)
Update and organize the data
Fetch and match the CNPJs
"""
import posixpath
from tempfile import gettempdir
import numpy as np
import pandas as pd
import recordlinkage as rl
from recordlinkage.compare import Exact, String
from tqdm import tqdm
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import add_object_to_tracker
from trase.tools.pcs import *
from trase.tools.aws.aws_helpers import (
read_s3_folder,
get_pandas_df,
)
from trase.tools.sei_pcs.pandas_utilities import full_merge, rename, concat
final_columns_order = [
"ID",
"CDA",
"CPF_CNPJ",
"COMPANY",
"STATUS",
"TYPE",
"CAPACITY",
"UF",
"MUNICIPALITY",
"ADDRESS",
"LAT",
"LONG",
"GEOCODE",
]
def main():
start_time = datetime.now()
print("Script started:", start_time)
print(80 * "-")
print(
"Step 1: Read new SICARM .csv files per State, get CNPJ data from old SICARM on S3 Bucket"
)
df_load = load_data()
df = insert_geocode(df_load)
df_sicarm = load_merge_old_version(df)
df_with_cnpj_step1, df_without_cnpj = split_sicarm_without_cnpj(df_sicarm)
print(80 * "-")
print("Step 2-3: Download and read CNPJ dataset file from S3")
cnpj = read_cnpj(download(download_cnpj_csv()))
print(80 * "-")
print("Step 4: Merge:: SICARM and CNPJ by COMPANY and CEP (zipcode)")
df_with_cnpj_step4, df_without_cnpj = sicarm_merge_cnpj_company_zipcode(
df_without_cnpj, cnpj
)
print(80 * "-")
print("Step 5: Merge:: SICARM and CNPJ by COMPANY and GEOCODE")
df_with_cnpj_step5, df_without_cnpj = sicarm_merge_cnpj_company_geocode(
df_without_cnpj, cnpj
)
print(80 * "-")
print("Step 6: RecordLinkage:: SICARM and CNPJ by COMPANY and ADDRESS")
df_with_cnpj_step6, df_without_cnpj = sicarm_recordlinkage_cnpj_address(
df_without_cnpj, cnpj
)
print(80 * "-")
print("Step 7: Merge:: SICARM and CNPJ by COMPANY and STATE")
df_with_cnpj_step7, df_without_cnpj = sicarm_merge_cnpj_state(df_without_cnpj, cnpj)
print(80 * "-")
print("Step 8: RecordLinkage:: SICARM and SICASQ by COMPANY and ADDRESS")
df_with_cnpj_step8, df_without_cnpj = sicarm_recordlinkage_sicasq(df_without_cnpj)
print(80 * "-")
print("Step 9: RecordLinkage:: SICARM and BoL by COMPANY")
df_with_cnpj_step9 = sicarm_recordlinkage_bol(df_without_cnpj)
print(80 * "-")
print("Step 10: Manually-Searched:: Insert Googled CNPJs into SICARM")
df_with_cnpj_step10 = load_manually_searched_cnpjs()
print(80 * "-")
print("Step 11: Concatenate SICARM with and without CNPJ information")
df_full_data, df_without_cnpj = final_concatenation(
df_sicarm,
df_with_cnpj_step1,
df_with_cnpj_step4,
df_with_cnpj_step5,
df_with_cnpj_step6,
df_with_cnpj_step7,
df_with_cnpj_step8,
df_with_cnpj_step9,
df_with_cnpj_step10,
)
print(80 * "-")
print("Export both to temp file and S3 Bucket")
write_csv_for_upload(
df_full_data, "brazil/logistics/sicarm/out/SICARM_CLEAN_2021.csv"
)
write_csv_for_upload(
df_without_cnpj, "brazil/logistics/sicarm/in/SICARM_CLEAN_MISSING_CNPJ_2021.csv"
)
print("Script took ", datetime.now() - start_time, "to run")
def normalize_str(d: pd.DataFrame, col: str, clean=False):
"""
Adjust column value characters encoding to UTF-8 and uppercase them.
Args:
d (pandas DataFrame): Dataframe to lookup
col (str): String column
clean: remove specific characters
Returns:
pandas DataFrame
"""
d[col] = (
d[col]
.str.normalize("NFKD")
.str.encode("ascii", errors="ignore")
.str.decode("utf-8")
)
d[col] = d[col].str.upper()
if clean is True:
d[col] = (
d[col]
.str.replace(".", "")
.str.replace("-", "")
.str.replace("/", "")
.str.replace(",", "")
.str.replace('"', "")
.str.split(" ")
.str.join("")
)
else:
d[col] = d[col]
return d
def get_state_uf():
df = get_pandas_df(
"brazil/metadata/UF.csv",
sep=",",
usecols=("CO_UF_IBGE", "CO_UF", "UF"),
dtype=str,
)
df = rename(df, columns={"CO_UF_IBGE": "state.code", "CO_UF": "state.uf_number"})
return df
def replace_state_uf_codes_with_names_and_trase_ids(df, column):
df_state_uf = get_state_uf()[[column, "state.code"]].drop_duplicates()
return full_merge(df, df_state_uf, on=column, how="left", validate="many_to_one")
# ==================================================================================== #
# STEP 1: Read all original SICARM .csv files per State
#
# (1) - read csv from S3 bucket and concatenate them,
# (2) - merge with States csv: fetch GEOCODE variable,
# (3) - merge with current SICARM version,
# (4) - split into two dataframes:
# one with matched CNPJ and
# other to be merged with the CNPJ dataset
# ==================================================================================== #
def load_data():
"""
Load original data (.csv) from S3 bucket
Rename column names
Concatenate each .csv (per State) into a country-level DataFrame
"""
s3_folder = read_s3_folder("brazil/logistics/sicarm/ori/")
col_names = [
"CDA",
"Armazenador",
"Endereรงo",
"Municรญpio",
"UF",
"Tipo",
"Telefone",
"Email",
"Capacidade (t)",
"Latitude",
"Longitude",
]
# Read each .csv file from S3 Bucket
df_list = []
for file in s3_folder:
raw_data = get_pandas_df(
file.key,
sep=";",
encoding="latin1",
skiprows=1,
names=col_names,
dtype=str,
keep_default_na=False,
)
raw_data = raw_data.iloc[1:]
raw_data = rename(
raw_data,
columns={
"Armazenador": "COMPANY",
"Endereรงo": "ADDRESS",
"Municรญpio": "MUNICIPALITY",
"UF": "UF",
"Tipo": "TYPE",
"Capacidade (t)": "CAPACITY",
"Latitude": "LAT",
"Longitude": "LONG",
},
)
raw_data = normalize_str(raw_data, "MUNICIPALITY")
df_list.append(raw_data)
# Concatenate DataFrames
df = concat(df_list)
df["ID"] = np.arange(df.shape[0])
print("New SICARM - shape:", df.shape[0])
return df
@uses_database
def get_geocode(cnx=None):
br_geocode = pd.read_sql(
""" SELECT SUBSTRING(trase_id, 4, 11) as "GEOCODE",
UNNEST(synonyms) || ' - ' || SUBSTRING(trase_id, 4, 2) name_id
FROM views.regions
WHERE country= 'BRAZIL' AND region_type = 'MUNICIPALITY' """,
cnx.cnx,
)
return br_geocode
def insert_geocode(df: pd.DataFrame):
"""
Insert Geocode into SICARM, based on State ("UF") information
Args:
df (pandas Dataframe): Dataframe where Geocode will be inserted
Return:
df_2 (pandas Dataframe): Dataframe with Geocode numeric values
"""
data = replace_state_uf_codes_with_names_and_trase_ids(df, "UF")
data["MUN_UF"] = data["MUNICIPALITY"] + " - " + data["state.code"]
df = full_merge(
data,
get_geocode(),
how="left",
left_on="MUN_UF",
right_on="name_id",
validate="many_to_one",
)
df = df.drop(columns={"name_id", "MUN_UF", "state.code"}, errors="raise")
return df
def load_merge_old_version(df: pd.DataFrame):
"""Load old SICARM version and merge with the new one to get it's CNPJ information"""
# Load SICARM Current Version
df_current = get_pandas_df(
"brazil/logistics/sicarm/out/SICARM_CLEAN.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
df_current = rename(
df_current,
columns={
"ADDRESS": "address",
"CAPACITY": "capacity",
"MUNICIPALITY": "municipality",
"LAT": "lat",
"LONG": "long",
"TYPE": "type",
},
)
# Join Current SICARM dataset with new one
df = df.merge(df_current, how="left", on=["CDA", "COMPANY", "GEOCODE", "UF"])
df = df[final_columns_order]
df = normalize_str(df, "CPF_CNPJ", clean=True)
return df
def split_sicarm_without_cnpj(df: pd.DataFrame):
"""Split SICARM with and without CNPJ data"""
sicarm_cnpj = df.loc[~df["CPF_CNPJ"].isna()]
print("SICARM with CNPJ - shape:", sicarm_cnpj.shape[0])
sicarm_nan_cnpj = df.loc[df["CPF_CNPJ"].isna()].copy()
print("SICARM NaN CNPJ - shape:", sicarm_nan_cnpj.shape[0])
return sicarm_cnpj, sicarm_nan_cnpj
# ==================================================================================== #
# STEP 2: Download CNPJ file from S3
#
# The files are downloaded to a temporary location on the operating system. For
# ease-of-use the files are only downloaded once.
# ==================================================================================== #
def download(key) -> str:
"""Downloads a file, but only once"""
client = aws_session().client("s3")
filename = posixpath.basename(key)
destination = Path(gettempdir(), filename)
if destination.is_file():
print(f"{destination} exists: skipping download")
else:
print(f"Downloading s3://trase-storage/{key} to {destination}")
client.download_file("trase-storage", key, str(destination))
add_object_to_tracker(key, "trase-storage", client=client)
return str(destination)
def download_cnpj_csv():
"""Download CNPJ 2019 file"""
return download("brazil/logistics/cnpj/original/CNPJ_2019.csv")
# ==================================================================================== #
# STEP 3: Read the CNPJ dataset.
#
# This contains three columns that we are interested in:
#
# (1) cnpj,
# (2) the company name,
# (3) geocode of the municipality,
# (4) address.
#
# This file is huge because it contains all companies in the whole of Brazil. We need to
# read pretty much the whole file, since it will serve as a lookup for the company name
# and geographic location. Because of its size we stream through the file and only load
# an absolute minimum of what we need into memory.
# ==================================================================================== #
def read_cnpj(csv):
"""Load CNPJ dataset into memory with specific columns"""
columns_to_keep = [
"cnpj",
"geocodmun",
"razao_social",
"situacao_cadastral",
"logradouro",
"numero",
"complemento",
"bairro",
"cep",
]
total_rows = 40_837_398
chunksize = 10_000
with open(csv, "r", errors="replace") as file:
chunk_iterator = pd.read_csv(
file,
sep=";",
dtype=str,
keep_default_na=False,
iterator=True,
chunksize=chunksize,
usecols=[
*columns_to_keep,
"municipio",
"data_situacao_cadastral",
"situacao_cadastral",
],
)
def filtered_chunk_iterator():
for d in tqdm(
chunk_iterator, desc="Loading CNPJ_2019", total=total_rows // chunksize
):
# ignore rows that are outside of Brazil
d = d[d["municipio"] != "EXTERIOR"]
# ignore NA and corrupt tax codes
d = d[d["cnpj"].str.isdigit()]
if d.empty:
continue
# cast CNAE and CNPJ to integers so we don't have leading zero woes when
# using them as a lookup/join keys later
d["cnpj_length"] = d["cnpj"].str.len()
d["cnpj"] = d["cnpj"].to_numpy(dtype="int64")
d["ADDRESS"] = (
d["logradouro"]
+ " "
+ d["numero"]
+ " "
+ d["complemento"]
+ " "
+ d["bairro"]
+ " "
+ d["cep"]
)
# split the dataframe into those where the date is defined
# and those where it is not
has_date = d["data_situacao_cadastral"] != "NA"
e = d[has_date]
f = d[~has_date]
# if a date is defined, discard old CNPJs that no longer exist
is_old = e["data_situacao_cadastral"].str.slice(0, 4).astype(int) < 2015
is_class_08 = e["situacao_cadastral"] == "08"
is_probably_inactive = is_old & is_class_08
e = e[~is_probably_inactive]
yield e[[*columns_to_keep, "ADDRESS", "cnpj_length"]]
yield f[[*columns_to_keep, "ADDRESS", "cnpj_length"]]
# concatenate the filtered chunk dataframes. it is only at this point that the file
# is actually read from disk
cnpj = pd.concat(filtered_chunk_iterator())
cnpj = rename(
cnpj, columns={"razao_social": "COMPANY", "geocodmun": "GEOCODE", "cep": "CEP"}
)
cnpj["ADDRESS"] = cnpj["ADDRESS"].replace(
dict.fromkeys(["NANANANA", "NA NA NA NA NA"], np.nan)
)
return cnpj
# ==================================================================================== #
# STEP 4: Merge SICARM with the CNPJ dataset - using companies' name and zipcode
#
# (1) Merge SICARM without CNPJ [from STEP 1] with the CNPJ dataset
# (2) Split SICARM merged with CNPJ:
# (2.1) With CNPJ: will merged in the end with other Sicarm DataFrames with CNPJ
# (2.2) Without CNPJ: will be used to fuzzy with the CNPJ dataset using different columns
# ==================================================================================== #
def sicarm_merge_cnpj_company_zipcode(df: pd.DataFrame, cnpj: pd.DataFrame):
"""Merge SICARM with CNPJ dataset by COMPANY and CEP (zipcode)"""
df["CEP"] = (
df["ADDRESS"]
.str[-10:]
.str.replace("-", "")
.str.replace(".", "")
.str.lstrip()
.str[:5]
)
cnpj["CEP"] = cnpj["CEP"].str[:5]
sicarm_cnpj = df.merge(
cnpj, how="left", on=["COMPANY", "CEP"], validate="many_to_many"
)
sicarm_cnpj = rename(
sicarm_cnpj,
columns={
"CPF_CNPJ": "CPF_CNPJ_NAN",
"cnpj": "CPF_CNPJ",
"GEOCODE_x": "GEOCODE",
"ADDRESS_x": "ADDRESS",
},
)
sicarm_cnpj = sicarm_cnpj[final_columns_order]
df_with_cnpj = sicarm_cnpj.loc[~sicarm_cnpj["CPF_CNPJ"].isna()].copy()
df_with_cnpj = df_with_cnpj.drop_duplicates(
subset=["CDA", "COMPANY", "ADDRESS", "TYPE"]
)
df_without_cnpj = sicarm_cnpj.loc[sicarm_cnpj["CPF_CNPJ"].isna()].copy()
print("With CNPJ:", df_with_cnpj.shape[0])
print("Missing:", df_without_cnpj.shape[0])
return df_with_cnpj, df_without_cnpj
# ==================================================================================== #
# STEP 5: Merge SICARM with CNPJs - using companies' names and geocode
#
# (1) Merge SICARM without CNPJ (Step 4) with CNPJ dataset using company and geocode
# (2) Split Merged data into two dataframes:
# (2.1) Without address: considered the Geocode as the closest info to fetch the CNPJ
# (2.2) With address: go fuzzy in the STEP 6
# ==================================================================================== #
def sicarm_merge_cnpj_company_geocode(df: pd.DataFrame, cnpj: pd.DataFrame):
"""Merge SICARM with CNPJ dataset by COMPANY and GEOCODE"""
df = normalize_str(df, "COMPANY")
df["ADDRESS"] = df["ADDRESS"].str[:-10]
df = normalize_str(df, "ADDRESS")
cnpj = normalize_str(cnpj, "COMPANY")
df = df.merge(cnpj, how="left", on=["COMPANY", "GEOCODE"], validate="many_to_many")
df = df.drop_duplicates(subset=["CDA", "COMPANY"])
df = rename(
df, columns={"cnpj": "CPF_CNPJ", "CPF_CNPJ": "CNPJ", "ADDRESS_x": "ADDRESS"}
)
df = df[final_columns_order]
has_cnpj = ~df["CPF_CNPJ"].isna()
df_with_cnpj = df[has_cnpj].copy()
df_without_cnpj = df[~df["ID"].isin(df_with_cnpj["ID"])].copy()
print("With CNPJ:", df_with_cnpj.shape[0])
print("Missing - Go RecordLinkage:", df_without_cnpj.shape[0])
return df_with_cnpj, df_without_cnpj
# ==================================================================================== #
# STEP 6: RecordLinkage between SICARM and CNPJ [Address]
# ==================================================================================== #
def sicarm_recordlinkage_cnpj_address(df: pd.DataFrame, cnpj: pd.DataFrame):
"""RecordLinkage between SICARM and CNPJ dataset:
Blocking on Geocode and Comparing Company and Address"""
df = df.set_index("ID")
cnpj["ID"] = np.arange(cnpj.shape[0])
cnpj = cnpj.set_index("ID")
indexer = rl.Index()
indexer.block(on="GEOCODE")
candidates = indexer.index(df, cnpj)
compare = rl.Compare()
compare.string(
"COMPANY", "COMPANY", threshold=0.8, method="levenshtein", label="empresa"
)
compare.string(
"ADDRESS", "ADDRESS", threshold=0.5, method="levenshtein", label="endereco"
)
features = compare.compute(candidates, df, cnpj)
features["score"] = features["empresa"] + features["endereco"]
features = features[features["score"] >= 1].reset_index()
features = features.sort_values("score", ascending=False)
features = features.drop_duplicates(subset="ID_1")
df = df.reset_index()
cnpj = cnpj.reset_index()
df = df[final_columns_order]
cnpj = cnpj[["ID", "cnpj", "ADDRESS", "COMPANY"]]
sicarm_merge = features.merge(df, how="left", left_on="ID_1", right_on="ID")
sicarm_merge = sicarm_merge.merge(
cnpj, how="left", left_on="ID_2", right_on="ID", suffixes=["_sicarm", "_cnpj"]
)
sicarm_merge = rename(sicarm_merge, columns={"ID_sicarm": "ID", "cnpj": "CPF_CNPJ"})
with_cnpj = sicarm_merge[sicarm_merge["score"] == 2]
# If Company name is equal in SICARM and CNPJ, they are in the same Geocode, but there's no
# address info in the CNPJ dataset, the Geocode will be considered the most reliable information
# to attribute the CNPJ to the company
same_company = sicarm_merge[
(sicarm_merge["empresa"] == 1)
& (sicarm_merge["endereco"] == 0)
& (sicarm_merge["ADDRESS_cnpj"].isna())
].copy()
df_with_cnpj = concat([with_cnpj, same_company])
df_with_cnpj = rename(
df_with_cnpj, {"COMPANY_sicarm": "COMPANY", "ADDRESS_sicarm": "ADDRESS"}
)
df_with_cnpj = df_with_cnpj[final_columns_order]
df_without_cnpj = df.loc[~df["ID"].isin(df_with_cnpj["ID"])].copy()
df_without_cnpj["CPF_CNPJ"] = ""
df_without_cnpj = df_without_cnpj[final_columns_order]
print("RecordLinkage: == Company Name & Address:", with_cnpj.shape[0])
print("RecordLinkage: == Company Name & NaN Address:", same_company.shape[0])
print("Missing:", df_without_cnpj.shape[0])
return df_with_cnpj, df_without_cnpj
# ==================================================================================== #
# STEP 7: Merge between SICARM and CNPJ [State]
# ==================================================================================== #
def sicarm_merge_cnpj_state(df: pd.DataFrame, cnpj: pd.DataFrame):
"""Merge SICARM with CNPJ dataset - companies and state"""
cnpj["UF_code"] = cnpj["GEOCODE"].str[:2]
df["UF_code"] = df["GEOCODE"].str[:2]
df = df.merge(cnpj, how="left", on=["COMPANY", "UF_code"], validate="many_to_many")
df.drop_duplicates(subset=["CDA", "COMPANY"], inplace=True)
df = rename(
df,
columns={
"CPF_CNPJ": "CPF_CNPJ_NAN",
"cnpj": "CPF_CNPJ",
"ADDRESS_x": "ADDRESS",
"GEOCODE_x": "GEOCODE",
"ID_x": "ID",
},
)
df = df[final_columns_order]
with_cnpj = df[~df["CPF_CNPJ"].isna()].copy()
without_cnpj = df[df["CPF_CNPJ"].isna()].copy()
print("Merge: with CNPJ:", with_cnpj.shape[0])
print("Missing:", without_cnpj.shape[0])
return with_cnpj, without_cnpj
# ==================================================================================== #
# STEP 8: Record Linkage between SICARM and SICASQ
# ==================================================================================== #
def sicarm_recordlinkage_sicasq(df: pd.DataFrame):
"""RecordLinkage between SICARM and SICASQ:
Blocking on State and Comparing Company and Address"""
sicasq_2017 = get_pandas_df_once(
"brazil/logistics/sicasq/SICASQ_2017.csv",
sep=";",
dtype=str,
keep_default_na=False,
encoding="LATIN1",
usecols=["COMPANY", "CNPJ", "ADDRESS", "UF"],
)
sicasq_2021 = get_pandas_df_once(
"brazil/logistics/sicasq/out/SICASQ_2021.csv",
sep=";",
dtype=str,
keep_default_na=False,
usecols=["COMPANY", "CNPJ", "ADDRESS", "UF"],
)
sicasq = concat([sicasq_2017, sicasq_2021])
sicasq = normalize_str(sicasq, "COMPANY")
sicasq = normalize_str(sicasq, "ADDRESS")
df = df.set_index("ID")
sicasq["ID"] = np.arange(sicasq.shape[0])
sicasq = sicasq.set_index("ID")
indexer = rl.Index()
indexer.sortedneighbourhood(on="COMPANY", window=9, block_on="UF")
indexer.sortedneighbourhood(on="ADDRESS", window=9, block_on="UF")
candidates = indexer.index(df, sicasq)
compare = rl.Compare(
[
String(
left_on="COMPANY",
right_on="COMPANY",
threshold=0.8,
method="levenshtein",
label="empresa",
),
String(
left_on="ADDRESS",
right_on="ADDRESS",
threshold=0.5,
method="levenshtein",
label="endereco",
),
Exact(left_on="UF", right_on="UF", label="estado"),
]
)
features = compare.compute(candidates, df, sicasq)
features = features[
(features["empresa"] == 1) & (features["estado"] == 1)
].reset_index()
features = features.drop_duplicates(subset="ID_1")
features = features.merge(df, how="left", left_on="ID_1", right_on="ID")
features = features.merge(
sicasq,
how="left",
left_on="ID_2",
right_on="ID",
suffixes=["_sicarm", "_sicasq"],
)
features = features.drop_duplicates()
features = features.rename(
columns={
"ID_1": "ID",
"COMPANY_sicarm": "COMPANY",
"CPF_CNPJ": "nan",
"CNPJ": "CPF_CNPJ",
"UF_sicarm": "UF",
"ADDRESS_sicarm": "ADDRESS",
}
)
with_cnpj = features[final_columns_order]
df = df.reset_index("ID")
without_cnpj = df[~df["ID"].isin(with_cnpj["ID"])]
with_cnpj = normalize_str(with_cnpj, "CPF_CNPJ", clean=True)
print("With CNPJ:", with_cnpj.shape[0])
print("Missing:", without_cnpj.shape[0])
return with_cnpj, without_cnpj
# ==================================================================================== #
# STEP 9: Record Linkage between SICARM and BoL
# ==================================================================================== #
def sicarm_recordlinkage_bol(df: pd.DataFrame):
"""RecordLinkage between SICARM and BoL 2018, 2019 and 2020:
Blocking and Comparing Company"""
df_bol = pd.concat(
[
get_pandas_df_once(
key,
sep=";",
dtype=str,
keep_default_na=False,
usecols=["exporter.label", "exporter.cnpj"],
)
for key in [
"brazil/trade/bol/2018/BRAZIL_BOL_DATALINER_SOY_2018.csv",
"brazil/trade/bol/2019/BRAZIL_BOL_2019.csv",
"brazil/trade/bol/2020/BRAZIL_BOL_2020.csv",
]
],
sort=False,
ignore_index=True,
)
df_bol = df_bol.rename(columns={"exporter.label": "COMPANY"}, errors="raise")
df_bol = normalize_str(df_bol, "COMPANY")
df_bol["ID"] = np.arange(df_bol.shape[0])
df_bol = df_bol.set_index("ID")
df = df.set_index("ID")
indexer = rl.Index()
indexer.sortedneighbourhood("COMPANY", window=9)
candidates = indexer.index(df, df_bol)
compare = rl.Compare()
compare.string(
"COMPANY",
"COMPANY",
threshold=0.8,
method="levenshtein",
label="empresa",
)
features = compare.compute(candidates, df, df_bol)
features = features[features["empresa"] == 1].reset_index()
features = features.drop_duplicates(subset="ID_1")
features = features.merge(df, how="left", left_on="ID_1", right_on="ID")
features = features.merge(df_bol, how="left", left_on="ID_2", right_on="ID")
features = rename(
features,
columns={
"ID_1": "ID",
"CPF_CNPJ": "NAD",
"exporter.cnpj": "CPF_CNPJ",
"COMPANY_x": "COMPANY",
},
)
different_location = concat(
e
for _, e in features.groupby(["CPF_CNPJ", "COMPANY"])
if e["MUNICIPALITY"].nunique() > 1
)
features = features[~features["CPF_CNPJ"].isin(different_location["CPF_CNPJ"])]
with_cnpj = features[final_columns_order]
print("With CNPJ:", with_cnpj.shape[0])
return with_cnpj
# ==================================================================================== #
# STEP 10: Insert manually-searched CNPJs on Google
# ==================================================================================== #
def load_manually_searched_cnpjs():
"""STEP 10: Insert manually-searched CNPJs on Google"""
df = get_pandas_df_once(
"brazil/logistics/sicarm/in/aux_sicarm_53_cnpjs_googled.csv",
sep=";",
dtype=str,
keep_default_na=False,
encoding="utf-8",
)
df = normalize_str(df, "CPF_CNPJ", clean=True)
print("Manually inserted CNPJS:", df.shape[0])
return df
# ==================================================================================== #
# STEP 11: Concatenate all previous steps and export
#
# Concatenate SICARM from Steps 1, 4, 5, 6, 7, 8, 9, and 10
# ==================================================================================== #
def final_concatenation(
df_sicarm: pd.DataFrame,
df_step1: pd.DataFrame,
df_step4: pd.DataFrame,
df_step5: pd.DataFrame,
df_step6: pd.DataFrame,
df_step7: pd.DataFrame,
df_step8: pd.DataFrame,
df_step9: pd.DataFrame,
df_step10: pd.DataFrame,
):
"""Step 11: Concatenate all previous steps"""
df = concat(
[
df_step1,
df_step4,
df_step5,
df_step6,
df_step7,
df_step8,
df_step9,
df_step10,
]
)
df = df[final_columns_order]
missing = df_sicarm[~df_sicarm["ID"].isin(df["ID"])]
full_data = concat([df, missing])
print("Final Full dataset:", full_data.shape[0])
print(
"Final with CNPJ:",
df.shape[0],
" - ",
f"{((df.shape[0] / full_data.shape[0])*100):.2f}",
"%",
)
print(
"Final without CNPJ:",
missing.shape[0],
" - ",
f"{((missing.shape[0] / full_data.shape[0])*100):.2f}",
"%",
)
return full_data, missing
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "sicarm_se_20210715")
dbt.source("trase-storage-raw", "")
dbt.source("trase-storage-raw", "sicarm_al_20210715")
dbt.source("trase-storage-raw", "sicarm_go_20210715")
dbt.source("trase-storage-raw", "sicarm_ms_20210715")
dbt.source("trase-storage-raw", "sicarm_ma_20210715")
dbt.source("trase-storage-raw", "sicarm_pi_20210715")
dbt.source("trase-storage-raw", "sicarm_ce_20210715")
dbt.source("trase-storage-raw", "sicarm_sp_20210715")
dbt.source("trase-storage-raw", "sicarm_pr_20210715")
dbt.source("trase-storage-raw", "sicarm_es_20210715")
dbt.source("trase-storage-raw", "sicarm_df_20210715")
dbt.source("trase-storage-raw", "sicarm_ba_20210715")
dbt.source("trase-storage-raw", "sicarm_am_20210715")
dbt.source("trase-storage-raw", "sicarm_ap_20210715")
dbt.source("trase-storage-raw", "sicarm_clean")
dbt.source("trase-storage-raw", "sicarm_rj_20210715")
dbt.source("trase-storage-raw", "sicarm_mg_20210715")
dbt.source("trase-storage-raw", "sicarm_rr_20210715")
dbt.source("trase-storage-raw", "sicarm_mt_20210715")
dbt.source("trase-storage-raw", "sicarm_rn_20210715")
dbt.source("trase-storage-raw", "sicarm_ac_20210715")
dbt.source("trase-storage-raw", "sicarm_sc_20210715")
dbt.source("trase-storage-raw", "auxiliary_cnpj_original_cnpj_2019")
dbt.source("trase-storage-raw", "sicarm_rs_20210715")
dbt.source("trase-storage-raw", "sicarm_pb_20210715")
dbt.source("trase-storage-raw", "sicarm_to_20210715")
dbt.source("trase-storage-raw", "sicarm_ro_20210715")
dbt.source("trase-storage-raw", "uf")
dbt.source("trase-storage-raw", "sicarm_pa_20210715")
dbt.source("trase-storage-raw", "sicarm_pe_20210715")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})