Skip to content

Seipcs Brazil Beef 2021

s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/SEIPCS_BRAZIL_BEEF_2021.csv

Dbt path: trase_production.main_brazil.seipcs_brazil_beef_2021

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/_schema_sei_pcs_v2_2_1.yml

Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/seipcs_brazil_beef_2021.py

Calls script: trase/models/brazil/beef/main.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: beef, brazil, sei_pcs, v2.2.1, 2021


seipcs_brazil_beef_2021

Description

No description


Details

Column Type Description
STATE_OF_PRODUCTION VARCHAR
MUNICIPALITY VARCHAR
LOGISTICS_HUB VARCHAR
LOGISTICS_HUB_TRASE_ID VARCHAR
PORT_OF_EXPORT VARCHAR
EXPORTER VARCHAR
STATE_OF_EXPORTER VARCHAR
IMPORTER VARCHAR
COUNTRY VARCHAR
VOLUME_RAW DOUBLE
VOLUME_PRODUCT DOUBLE
FOB DOUBLE
HS4 VARCHAR
HS6 VARCHAR
YEAR BIGINT
EXPORTER_CNPJ VARCHAR
BRANCH VARCHAR
GEOCODE_SOURCE VARCHAR

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.id_mun_master_a_new
  • source.trase_duckdb.trase-storage-raw.brazil_beef_carcass_weight_equivalence_factors
  • source.trase_duckdb.trase-storage-raw.br_cost_matrix_osrm_2019
  • model.trase_duckdb.uf_new
  • model.trase_duckdb.cd_disaggregated_beef_2021_new
  • model.trase_duckdb.brazil_sif_inspected_beef_establishments_2020
  • model.trase_duckdb.br_bovine_slaughter_2015_2020_new
  • model.trase_duckdb.beef_auxiliary_cnpj_2021_new
  • model.trase_duckdb.cnpj8_supply_sheds_2020

Sources

  • ['trase-storage-raw', 'id_mun_master_a_new']
  • ['trase-storage-raw', 'brazil_beef_carcass_weight_equivalence_factors']
  • ['trase-storage-raw', 'br_cost_matrix_osrm_2019']
"""
Needs ~40GB memory
"""

from time import time
import sys

from trase.tools import sps
from trase.models.brazil.beef.quality_assurance import quality_assurance


def main(year):
    start = time()

    supplychain = sps.SupplyChain("brazil/beef", year=year)
    supplychain.preparation()

    print(f"Preparation {time() - start}")

    start = time()
    supplychain.load()
    print(f"Load {time() - start}")

    start = time()
    supplychain.run()
    print(f"Run {time() - start}")

    supplychain.flow_report_by_attribute("vol", ["branch"])
    supplychain.flow_report_by_attribute("vol", ["geocode_source"])
    supplychain.export_results(flows=False)
    quality_assurance(year)
    supplychain.upload_results()


if __name__ == "__main__":
    year = int(sys.argv[1])
    main(year)
from unittest.mock import patch

from trase.tools.etl_internal.processors import S3Mixin
from trase.tools.sei_pcs.supply_chain import SupplyChain

YEAR = 2021


def model(dbt, cursor):
    dbt.config(materialized="external")
    # Note that the column renames are handled in preparation.py, and that the
    # .df() has to be loaded before selecting columns, due to how dbt parses python models
    municipality_df = dbt.source("trase-storage-raw", "id_mun_master_a_new").df()
    municipality_df = municipality_df[["geocode", "name", "state_code", "trase_id"]]
    state_df = dbt.ref("uf_new").df()
    state_df = state_df[["uf_code", "state_name", "uf_name", "trase_id"]].astype(str)
    equivalence_factors_df = dbt.source(
        "trase-storage-raw", "brazil_beef_carcass_weight_equivalence_factors"
    ).df()
    equivalence_factors_df = equivalence_factors_df[["hs_code", "hs_type", "eq_factor"]]
    cost_df = dbt.source("trase-storage-raw", "br_cost_matrix_osrm_2019").df()
    cost_df = cost_df[["Origin", "Destination", "Seconds"]]

    flows_df = dbt.ref("cd_disaggregated_beef_2021_new").df()
    flows_df = (
        flows_df[
            [
                "year",
                "hs6",
                "hs4",
                "exporter_name",
                "exporter_cnpj",
                "state_of_production",
                "exporter_geocode",
                "country",
                "vol",
                "fob",
                "importer_name",
                "port",
                "cnpj8",
                "parent_cnpj8",
                "cnpj_is_valid",
                "cwe",
                "exporter_state",
            ]
        ]
        .astype(str)
        .assign(
            year=lambda df: df["year"].astype(int),
            vol=lambda df: df["vol"].astype(float),
            fob=lambda df: df["fob"].astype(float),
            cwe=lambda df: df["cwe"].astype(float),
        )
    )

    sif_df = dbt.ref("brazil_sif_inspected_beef_establishments_2020").df()
    sif_df = sif_df[
        [
            "name",
            "sif",
            "cnpj",
            "level",
            "country",
            "commodity",
            "municipality",
            "cnpj8",
            "parent_cnpj8",
        ]
    ].astype(
        str
    )  # TODO: update
    sigsif_df = dbt.ref("br_bovine_slaughter_2015_2020_new").df()
    sigsif_df = sigsif_df[["state", "geocode", "prop_flows"]].astype(
        str
    )  # TODO: update? what to do about note "we do not use data later than 2020 because it does not differentiate between state of slaughter and state of origin."?

    cnpj_df = dbt.ref("beef_auxiliary_cnpj_2021_new").df()
    cnpj_df = cnpj_df[["cnpj", "cnpj8", "tax_municipality"]]

    gta_df = dbt.ref("cnpj8_supply_sheds_2020").df()
    gta_df = gta_df[
        [
            "cnpj8",
            "logistics_hub_geocode",
            "shed_municipality_trase_id",
            "prop_flows",
            "biome",
        ]
    ].astype(str)

    data = {
        # static data (doesn't update every year)
        "municipality": municipality_df,
        "state": state_df,
        "equivalence_factors": equivalence_factors_df,
        "cost": cost_df,
        # trade data
        "flows": flows_df,
        # SIF and SIGSIF data
        "sif": sif_df,
        "sigsif": sigsif_df,
        # RFB CNPJ data
        "cnpj": cnpj_df,
        # GTAs - pinned to 2020
        "gta": gta_df,
    }

    # -------------------------------------------------------------------------------- #
    # run the model
    # -------------------------------------------------------------------------------- #
    supply_chain = SupplyChain("brazil/beef", year=YEAR, pre_extracted_data=data)

    with patch.object(
        S3Mixin,
        "extract",
        side_effect=RuntimeError("Preparation class should not be reading from S3"),
    ):
        supply_chain.preparation()

    supply_chain.load()
    supply_chain.run()

    return supply_chain.df_export()