Aux Silo Cnpj 2019 Date
s3://trase-storage/brazil/logistics/sicarm/silos_map/in/AUX_SILO_CNPJ_2019_DATE.csv
Dbt path: trase_production.main_brazil.aux_silo_cnpj_2019_date
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/logistics/sicarm/silos_map/in/_schema.yml
Model file link: trase/data_pipeline/models/brazil/logistics/sicarm/silos_map/in/aux_silo_cnpj_2019_date.py
Calls script: trase/data/brazil/logistics/sicarm/archive/silos_map/in/AUX_SILO_CNPJ_2019_DATE.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, brazil, in, logistics, sicarm, silos_map
aux_silo_cnpj_2019_date
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/sicarm/silos_map/in/AUX_SILO_CNPJ_2019_DATE.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.cnpj_2019_filteredsource.trase_duckdb.trase-storage-raw.2-silos_location
Sources
['trase-storage-raw', 'cnpj_2019_filtered']['trase-storage-raw', '2-silos_location']
"""
Brazil - Silos Map
Create datasets focused on the Silos Map creation:
(1) CNPJ_2019_filtered
(1.1) Same used to insert CNPJ on SICARM dataset (2021)
(1.2) Contains unique CNPJ_8, dates
(2) AUX_SILO_CNPJ_2019_DATE.csv
- Uses output of script 2-br_create_silos_map_spatial.R ("brazil/logistics/sicarm/silos_map/in/2-SILOS_LOCATION.csv")
- Used as input on script 3-br_create_silos_map_consolidate.R
"""
import posixpath
from tempfile import gettempdir
from tqdm import tqdm
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs import *
import pandas as pd
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.tracker import add_object_to_tracker
timeInit = datetime.now()
def download(key) -> str:
"""Downloads a file, but only once"""
filename = posixpath.basename(key)
destination = Path(gettempdir(), filename)
client = aws_session().client("s3")
if destination.is_file():
print(f"{destination} exists: skipping download")
else:
print(f"Downloading s3://trase-storage/{key} to {destination}")
client.download_file("trase-storage", key, str(destination))
add_object_to_tracker(key, "trase-storage", client=client)
return str(destination)
# primary_csv = download("brazil/logistics/cnpj/original/CNPJ_2019.csv")
# columns_to_keep = [
# "cnpj",
# "uf",
# "geocodmun",
# "municipio",
# "razao_social",
# "situacao_cadastral",
# "data_situacao_cadastral",
# "data_inicio_atividade",
# "cnae_primary",
# "logradouro",
# "numero",
# "complemento",
# "bairro",
# "cep",
# ]
# total_rows = 40_837_398
# chunksize = 10_000
#
# with open(primary_csv, "r", errors="replace") as file:
# chunk_iterator = pd.read_csv(
# file,
# sep=";",
# dtype=str,
# keep_default_na=False,
# iterator=True,
# chunksize=chunksize,
# usecols=[
# *columns_to_keep,
# "municipio",
# "data_situacao_cadastral",
# "situacao_cadastral",
# ],
# )
#
# def filtered_chunk_iterator():
# for d in tqdm(
# chunk_iterator, desc="Loading CNPJ_2019", total=total_rows // chunksize
# ):
# # ignore rows that are outside of Brazil
# d = d[d["municipio"] != "EXTERIOR"]
#
# # ignore NA and corrupt tax codes
# d = d[d["cnpj"].str.isdigit()]
#
# if d.empty:
# continue
#
# # cast CNAE and CNPJ to integers so we don't have leading zero woes when
# # using them as a lookup/join keys later
# # TODO is this quicker if we do it on the the whole dataframe?
# d["cnpj_length"] = d["cnpj"].str.len() # TODO: numpy.int8
# d["cnpj"] = d["cnpj"].to_numpy(dtype="int64")
#
# # d.loc[~d["cnae_primary"].str.isdigit(), "cnae_primary"] = -1
# # d["cnae_primary"] = d["cnae_primary"].astype(int) # TODO: numpy.int32
#
# # split the dataframe into those where the date is defined
# # and those where it is not
# has_date = d["data_situacao_cadastral"] != "NA"
# e = d[has_date]
# f = d[~has_date]
#
# # if a date is defined, discard old CNPJs that no longer exist
# is_old = e["data_situacao_cadastral"].str.slice(0, 4).astype(int) < 1960
# is_class_08 = e["situacao_cadastral"] == "08"
# is_probably_inactive = is_old & is_class_08
# e = e[~is_probably_inactive]
#
# yield e[[*columns_to_keep, "cnpj_length"]]
# yield f[[*columns_to_keep, "cnpj_length"]]
#
# # concatenate the filtered chunk dataframes. it is only at this point that the file
# # is actually read from disk
# df_company_lookup = pd.concat(filtered_chunk_iterator())
#
# # rename column (we will be using "type" later)
# df_company_lookup = df_company_lookup.rename(
# columns={
# "type": "original_type",
# "geocodmun": "GEOCODE",
# "razao_social": "COMPANY",
# "municipio": "MUNICIPALITY",
# "uf": "UF",
# "cnpj": "CNPJ",
# "situacao_cadastral": "SITUACAO_CADASTRAL",
# "data_situacao_cadastral": "DATA_SITUACAO_CADASTRAL",
# "data_inicio_atividade": "DATA_INICIO_ATIVIDADE",
# "cep": "CEP",
# "cnpj_length": "CNPJ_LENGTH",
# "bairro": "BAIRRO",
# }
# )
# df_company_lookup = df_company_lookup.astype({"CNPJ": str})
# df_company_lookup["CNPJ_8"] = df_company_lookup["CNPJ"].str[:8]
# df_company_lookup["ADDRESS"] = (
# df_company_lookup["logradouro"]
# + "-"
# + df_company_lookup["numero"]
# + "-"
# + df_company_lookup["complemento"]
# )
#
#
# df_company_lookup["DUPLICATED"] = df_company_lookup.duplicated(
# subset=["GEOCODE", "COMPANY", "CNPJ_8"]
# )
#
# print(df_company_lookup[df_company_lookup["DUPLICATED"] == True].shape[0])
# print(df_company_lookup[df_company_lookup["DUPLICATED"] == False].shape[0])
#
# df_company_lookup.loc[df_company_lookup["DUPLICATED"] == False, "CNPJ_8_UNIQUE"] = True
# df_company_lookup.loc[df_company_lookup["DUPLICATED"] == True, "CNPJ_8_UNIQUE"] = False
#
# path = os.path.join(gettempdir(), "CNPJ_2019_filtered.csv").replace("\\", "/")
# df_company_lookup[
# [
# "CNPJ",
# "CNPJ_LENGTH",
# "CNPJ_8",
# "CNPJ_8_UNIQUE",
# "SITUACAO_CADASTRAL",
# "DATA_SITUACAO_CADASTRAL",
# "DATA_INICIO_ATIVIDADE",
# "COMPANY",
# "GEOCODE",
# "UF",
# "MUNICIPALITY",
# "ADDRESS",
# "CEP",
# ]
# ].to_csv(path, sep=";", encoding="utf-8", index=False)
#
# # Upload CSV to S3 Bucket
# print("Now run:")
# print(
# f"aws s3 cp {path} s3://trase-storage/brazil/logistics/cnpj/CNPJ_2019_filtered.csv"
# )
"""
Workaround of the script: "3-br_create_silos_map_consolidate.R"
"""
total_rows = 40_837_398
chunksize = 10_000
secondary_csv = download("brazil/logistics/cnpj/CNPJ_2019_filtered.csv")
with open(secondary_csv, "r", errors="replace") as file:
chunk_iterator = pd.read_csv(
file,
sep=";",
dtype=str,
keep_default_na=False,
iterator=True,
chunksize=chunksize,
)
def filtered_chunk_iterator():
for d in tqdm(
chunk_iterator, desc="Loading CNPJ_2019", total=total_rows // chunksize
):
yield d[["CNPJ", "DATA_INICIO_ATIVIDADE"]]
foo = pd.concat(filtered_chunk_iterator())
baz = get_pandas_df(
"brazil/logistics/sicarm/silos_map/in/2-SILOS_LOCATION.csv", sep=";", dtype=str
)
cnpj_dates = foo[["CNPJ", "DATA_INICIO_ATIVIDADE"]]
baz = baz.merge(foo, how="left", on="CNPJ")
baz = baz.drop_duplicates()
write_csv_for_upload(
baz, "brazil/logistics/sicarm/silos_map/in/AUX_SILO_CNPJ_2019_DATE.csv"
)
print("Everything took:", datetime.now() - timeInit)
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "cnpj_2019_filtered")
dbt.source("trase-storage-raw", "2-silos_location")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})