Seipcs Brazil Beef 2022
s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/SEIPCS_BRAZIL_BEEF_2022.csv
Dbt path: trase_production.main_brazil.seipcs_brazil_beef_2022
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/_schema_sei_pcs_v2_2_1.yml
Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/seipcs_brazil_beef_2022.py
Calls script: trase/models/brazil/beef/main.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: beef, brazil, sei_pcs, v2.2.1, 2022
seipcs_brazil_beef_2022
Description
No description
Details
| Column | Type | Description |
|---|---|---|
STATE_OF_PRODUCTION |
VARCHAR |
|
MUNICIPALITY |
VARCHAR |
|
LOGISTICS_HUB |
VARCHAR |
|
LOGISTICS_HUB_TRASE_ID |
VARCHAR |
|
PORT_OF_EXPORT |
VARCHAR |
|
EXPORTER |
VARCHAR |
|
STATE_OF_EXPORTER |
VARCHAR |
|
IMPORTER |
VARCHAR |
|
COUNTRY |
VARCHAR |
|
VOLUME_RAW |
DOUBLE |
|
VOLUME_PRODUCT |
DOUBLE |
|
FOB |
DOUBLE |
|
HS4 |
VARCHAR |
|
HS6 |
VARCHAR |
|
YEAR |
BIGINT |
|
EXPORTER_CNPJ |
VARCHAR |
|
BRANCH |
VARCHAR |
|
GEOCODE_SOURCE |
VARCHAR |
Models / Seeds
source.trase_duckdb.trase-storage-raw.id_mun_master_a_newsource.trase_duckdb.trase-storage-raw.brazil_beef_carcass_weight_equivalence_factorssource.trase_duckdb.trase-storage-raw.br_cost_matrix_osrm_2019model.trase_duckdb.uf_newmodel.trase_duckdb.cd_disaggregated_beef_2022_newmodel.trase_duckdb.brazil_sif_inspected_beef_establishments_2022model.trase_duckdb.br_bovine_slaughter_2015_2020_newmodel.trase_duckdb.beef_auxiliary_cnpj_2022_newmodel.trase_duckdb.cnpj8_supply_sheds_2020
Sources
['trase-storage-raw', 'id_mun_master_a_new']['trase-storage-raw', 'brazil_beef_carcass_weight_equivalence_factors']['trase-storage-raw', 'br_cost_matrix_osrm_2019']
"""
Needs ~40GB memory
"""
from time import time
import sys
from trase.tools import sps
from trase.models.brazil.beef.quality_assurance import quality_assurance
def main(year):
start = time()
supplychain = sps.SupplyChain("brazil/beef", year=year)
supplychain.preparation()
print(f"Preparation {time() - start}")
start = time()
supplychain.load()
print(f"Load {time() - start}")
start = time()
supplychain.run()
print(f"Run {time() - start}")
supplychain.flow_report_by_attribute("vol", ["branch"])
supplychain.flow_report_by_attribute("vol", ["geocode_source"])
supplychain.export_results(flows=False)
quality_assurance(year)
supplychain.upload_results()
if __name__ == "__main__":
year = int(sys.argv[1])
main(year)
from unittest.mock import patch
from trase.tools.etl_internal.processors import S3Mixin
from trase.tools.sei_pcs.supply_chain import SupplyChain
YEAR = 2022
def model(dbt, cursor):
dbt.config(materialized="external")
# Note that the column renames are handled in preparation.py, and that the
# .df() has to be loaded before selecting columns, due to how dbt parses python models
municipality_df = dbt.source("trase-storage-raw", "id_mun_master_a_new").df()
municipality_df = municipality_df[["geocode", "name", "state_code", "trase_id"]]
state_df = dbt.ref("uf_new").df()
state_df = state_df[["uf_code", "state_name", "uf_name", "trase_id"]].astype(str)
equivalence_factors_df = dbt.source(
"trase-storage-raw", "brazil_beef_carcass_weight_equivalence_factors"
).df()
equivalence_factors_df = equivalence_factors_df[["hs_code", "hs_type", "eq_factor"]]
cost_df = dbt.source("trase-storage-raw", "br_cost_matrix_osrm_2019").df()
cost_df = cost_df[["Origin", "Destination", "Seconds"]]
flows_df = dbt.ref("cd_disaggregated_beef_2022_new").df()
flows_df = (
flows_df[
[
"year",
"hs6",
"hs4",
"exporter_name",
"exporter_cnpj",
"state_of_production",
"exporter_geocode",
"country",
"vol",
"fob",
"importer_name",
"port",
"cnpj8",
"parent_cnpj8",
"cnpj_is_valid",
"cwe",
"exporter_state",
]
]
.astype(str)
.assign(
year=lambda df: df["year"].astype(int),
vol=lambda df: df["vol"].astype(float),
fob=lambda df: df["fob"].astype(float),
cwe=lambda df: df["cwe"].astype(float),
)
)
sif_df = dbt.ref("brazil_sif_inspected_beef_establishments_2022").df()
sif_df = sif_df[
[
"name",
"sif",
"cnpj",
"level",
"country",
"commodity",
"municipality",
"cnpj8",
"parent_cnpj8",
]
].astype(str)
sigsif_df = dbt.ref("br_bovine_slaughter_2015_2020_new").df()
sigsif_df = sigsif_df[["state", "geocode", "prop_flows"]].astype(
str
) # TODO: update? what to do about note "we do not use data later than 2020 because it does not differentiate between state of slaughter and state of origin."?
cnpj_df = dbt.ref("beef_auxiliary_cnpj_2022_new").df()
cnpj_df = cnpj_df[["cnpj", "cnpj8", "tax_municipality"]]
gta_df = dbt.ref("cnpj8_supply_sheds_2020").df()
gta_df = gta_df[
[
"cnpj8",
"logistics_hub_geocode",
"shed_municipality_trase_id",
"prop_flows",
"biome",
]
].astype(str)
data = {
# static data (doesn't update every year)
"municipality": municipality_df,
"state": state_df,
"equivalence_factors": equivalence_factors_df,
"cost": cost_df,
# trade data
"flows": flows_df,
# SIF and SIGSIF data
"sif": sif_df,
"sigsif": sigsif_df,
# RFB CNPJ data
"cnpj": cnpj_df,
# GTAs - pinned to 2020
"gta": gta_df,
}
# -------------------------------------------------------------------------------- #
# run the model
# -------------------------------------------------------------------------------- #
supply_chain = SupplyChain("brazil/beef", year=YEAR, pre_extracted_data=data)
with patch.object(
S3Mixin,
"extract",
side_effect=RuntimeError("Preparation class should not be reading from S3"),
):
supply_chain.preparation()
supply_chain.load()
supply_chain.run()
return supply_chain.df_export()