Skip to content

Aux Silo Cnpj 2019 Date

s3://trase-storage/brazil/logistics/sicarm/silos_map/in/AUX_SILO_CNPJ_2019_DATE.csv

Dbt path: trase_production.main_brazil.aux_silo_cnpj_2019_date

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/logistics/sicarm/silos_map/in/_schema.yml

Model file link: trase/data_pipeline/models/brazil/logistics/sicarm/silos_map/in/aux_silo_cnpj_2019_date.py

Calls script: trase/data/brazil/logistics/sicarm/archive/silos_map/in/AUX_SILO_CNPJ_2019_DATE.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, brazil, in, logistics, sicarm, silos_map


aux_silo_cnpj_2019_date

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/logistics/sicarm/silos_map/in/AUX_SILO_CNPJ_2019_DATE.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.cnpj_2019_filtered
  • source.trase_duckdb.trase-storage-raw.2-silos_location

Sources

  • ['trase-storage-raw', 'cnpj_2019_filtered']
  • ['trase-storage-raw', '2-silos_location']
"""
Brazil - Silos Map

Create datasets focused on the Silos Map creation:
    (1) CNPJ_2019_filtered
        (1.1) Same used to insert CNPJ on SICARM dataset (2021)
        (1.2) Contains unique CNPJ_8, dates
    (2) AUX_SILO_CNPJ_2019_DATE.csv
        - Uses output of script 2-br_create_silos_map_spatial.R ("brazil/logistics/sicarm/silos_map/in/2-SILOS_LOCATION.csv")
        - Used as input on script 3-br_create_silos_map_consolidate.R

"""

import posixpath
from tempfile import gettempdir
from tqdm import tqdm

from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pcs import *
import pandas as pd
from trase.tools.aws.aws_helpers import get_pandas_df
from trase.tools.aws.tracker import add_object_to_tracker

timeInit = datetime.now()


def download(key) -> str:
    """Downloads a file, but only once"""
    filename = posixpath.basename(key)
    destination = Path(gettempdir(), filename)
    client = aws_session().client("s3")
    if destination.is_file():
        print(f"{destination} exists: skipping download")
    else:
        print(f"Downloading s3://trase-storage/{key} to {destination}")
        client.download_file("trase-storage", key, str(destination))
    add_object_to_tracker(key, "trase-storage", client=client)
    return str(destination)


# primary_csv = download("brazil/logistics/cnpj/original/CNPJ_2019.csv")

# columns_to_keep = [
#     "cnpj",
#     "uf",
#     "geocodmun",
#     "municipio",
#     "razao_social",
#     "situacao_cadastral",
#     "data_situacao_cadastral",
#     "data_inicio_atividade",
#     "cnae_primary",
#     "logradouro",
#     "numero",
#     "complemento",
#     "bairro",
#     "cep",
# ]
# total_rows = 40_837_398
# chunksize = 10_000
#
# with open(primary_csv, "r", errors="replace") as file:
#     chunk_iterator = pd.read_csv(
#         file,
#         sep=";",
#         dtype=str,
#         keep_default_na=False,
#         iterator=True,
#         chunksize=chunksize,
#         usecols=[
#             *columns_to_keep,
#             "municipio",
#             "data_situacao_cadastral",
#             "situacao_cadastral",
#         ],
#     )
#
#     def filtered_chunk_iterator():
#         for d in tqdm(
#             chunk_iterator, desc="Loading CNPJ_2019", total=total_rows // chunksize
#         ):
#             # ignore rows that are outside of Brazil
#             d = d[d["municipio"] != "EXTERIOR"]
#
#             # ignore NA and corrupt tax codes
#             d = d[d["cnpj"].str.isdigit()]
#
#             if d.empty:
#                 continue
#
#             # cast CNAE and CNPJ to integers so we don't have leading zero woes when
#             # using them as a lookup/join keys later
#             # TODO is this quicker if we do it on the the whole dataframe?
#             d["cnpj_length"] = d["cnpj"].str.len()  # TODO: numpy.int8
#             d["cnpj"] = d["cnpj"].to_numpy(dtype="int64")
#
#             # d.loc[~d["cnae_primary"].str.isdigit(), "cnae_primary"] = -1
#             # d["cnae_primary"] = d["cnae_primary"].astype(int)  # TODO: numpy.int32
#
#             # split the dataframe into those where the date is defined
#             # and those where it is not
#             has_date = d["data_situacao_cadastral"] != "NA"
#             e = d[has_date]
#             f = d[~has_date]
#
#             # if a date is defined, discard old CNPJs that no longer exist
#             is_old = e["data_situacao_cadastral"].str.slice(0, 4).astype(int) < 1960
#             is_class_08 = e["situacao_cadastral"] == "08"
#             is_probably_inactive = is_old & is_class_08
#             e = e[~is_probably_inactive]
#
#             yield e[[*columns_to_keep, "cnpj_length"]]
#             yield f[[*columns_to_keep, "cnpj_length"]]
#
#     # concatenate the filtered chunk dataframes. it is only at this point that the file
#     # is actually read from disk
#     df_company_lookup = pd.concat(filtered_chunk_iterator())
#
# # rename column (we will be using "type" later)
# df_company_lookup = df_company_lookup.rename(
#     columns={
#         "type": "original_type",
#         "geocodmun": "GEOCODE",
#         "razao_social": "COMPANY",
#         "municipio": "MUNICIPALITY",
#         "uf": "UF",
#         "cnpj": "CNPJ",
#         "situacao_cadastral": "SITUACAO_CADASTRAL",
#         "data_situacao_cadastral": "DATA_SITUACAO_CADASTRAL",
#         "data_inicio_atividade": "DATA_INICIO_ATIVIDADE",
#         "cep": "CEP",
#         "cnpj_length": "CNPJ_LENGTH",
#         "bairro": "BAIRRO",
#     }
# )
# df_company_lookup = df_company_lookup.astype({"CNPJ": str})
# df_company_lookup["CNPJ_8"] = df_company_lookup["CNPJ"].str[:8]
# df_company_lookup["ADDRESS"] = (
#     df_company_lookup["logradouro"]
#     + "-"
#     + df_company_lookup["numero"]
#     + "-"
#     + df_company_lookup["complemento"]
# )
#
#
# df_company_lookup["DUPLICATED"] = df_company_lookup.duplicated(
#     subset=["GEOCODE", "COMPANY", "CNPJ_8"]
# )
#
# print(df_company_lookup[df_company_lookup["DUPLICATED"] == True].shape[0])
# print(df_company_lookup[df_company_lookup["DUPLICATED"] == False].shape[0])
#
# df_company_lookup.loc[df_company_lookup["DUPLICATED"] == False, "CNPJ_8_UNIQUE"] = True
# df_company_lookup.loc[df_company_lookup["DUPLICATED"] == True, "CNPJ_8_UNIQUE"] = False
#
# path = os.path.join(gettempdir(), "CNPJ_2019_filtered.csv").replace("\\", "/")
# df_company_lookup[
#     [
#         "CNPJ",
#         "CNPJ_LENGTH",
#         "CNPJ_8",
#         "CNPJ_8_UNIQUE",
#         "SITUACAO_CADASTRAL",
#         "DATA_SITUACAO_CADASTRAL",
#         "DATA_INICIO_ATIVIDADE",
#         "COMPANY",
#         "GEOCODE",
#         "UF",
#         "MUNICIPALITY",
#         "ADDRESS",
#         "CEP",
#     ]
# ].to_csv(path, sep=";", encoding="utf-8", index=False)
#
# # Upload CSV to S3 Bucket
# print("Now run:")
# print(
#     f"aws s3 cp {path} s3://trase-storage/brazil/logistics/cnpj/CNPJ_2019_filtered.csv"
# )


"""
Workaround of the script: "3-br_create_silos_map_consolidate.R"
"""

total_rows = 40_837_398
chunksize = 10_000


secondary_csv = download("brazil/logistics/cnpj/CNPJ_2019_filtered.csv")

with open(secondary_csv, "r", errors="replace") as file:
    chunk_iterator = pd.read_csv(
        file,
        sep=";",
        dtype=str,
        keep_default_na=False,
        iterator=True,
        chunksize=chunksize,
    )

    def filtered_chunk_iterator():
        for d in tqdm(
            chunk_iterator, desc="Loading CNPJ_2019", total=total_rows // chunksize
        ):
            yield d[["CNPJ", "DATA_INICIO_ATIVIDADE"]]

    foo = pd.concat(filtered_chunk_iterator())


baz = get_pandas_df(
    "brazil/logistics/sicarm/silos_map/in/2-SILOS_LOCATION.csv", sep=";", dtype=str
)
cnpj_dates = foo[["CNPJ", "DATA_INICIO_ATIVIDADE"]]

baz = baz.merge(foo, how="left", on="CNPJ")
baz = baz.drop_duplicates()

write_csv_for_upload(
    baz, "brazil/logistics/sicarm/silos_map/in/AUX_SILO_CNPJ_2019_DATE.csv"
)
print("Everything took:", datetime.now() - timeInit)
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "cnpj_2019_filtered")
    dbt.source("trase-storage-raw", "2-silos_location")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})