Seipcs Brazil Beef 2015 Patched
s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/SEIPCS_BRAZIL_BEEF_2015.csv
Dbt path: trase_production.main_brazil.seipcs_brazil_beef_2015_patched
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/_schema_sei_pcs_v2_2_1.yml
Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/seipcs_brazil_beef_2015_patched.py
Calls script: trase/data/brazil/beef/sei_pcs/v2_2_1/SEIPCS_BRAZIL_BEEF_20XX.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, beef, brazil, sei_pcs, v2.2.1, 2015
seipcs_brazil_beef_2015_patched
Description
No description
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
model.trase_duckdb.seipcs_brazil_beef_2015
"""
In the v2.2.1 model we modified the anonymisation step (PR: #4982) in two ways:
- The cutoff threshold was raised from 250kg to 500kg
- We additionally anonymised logistics hubs etc.
This new method was used in all of the newer years (2021+). But due to time pressure,
rather than re-running the model for the old years we just patch the files in this
script.
"""
from pprint import pprint
from tqdm import tqdm
from trase.models.brazil.beef.definition import flows_export
from trase.models.brazil.beef.model import ANONYMISATION_VALUES, VOLUME_CUTOFF_KG
from trase.tools.aws import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.etl.utilities import consolidate
DO_NOT_ANONYMISE = [
"port",
"exporter_name",
"exporter_state.state_name",
"importer_name",
"country",
"cwe",
"vol",
"fob",
"hs4",
"hs6",
"year",
"exporter_cnpj",
"branch",
]
S3_KEYS = [
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2010.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2011.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2012.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2013.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2014.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2015.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2016.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2017.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2018.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2019.csv",
"brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2020.csv",
]
NUMERIC_COLUMNS = ["FOB", "VOLUME_RAW", "VOLUME_PRODUCT"]
def main():
anonymisation_values = {
column.header: ANONYMISATION_VALUES[column.flow_attribute]
for column in flows_export
if column.flow_attribute not in DO_NOT_ANONYMISE
}
print("Using the following values for anonymisation:")
pprint(anonymisation_values)
for input_key in tqdm(S3_KEYS):
output_key = input_key.replace("v2.2.0", "v2.2.1")
assert input_key != output_key
# read CSV file as all strings
df = get_pandas_df(
input_key,
sep=";",
dtype=str,
na_filter=False,
)
# we do however need to cast the numeric columns
df = df.astype({column: float for column in NUMERIC_COLUMNS})
# perform anonymisation
is_small = df["VOLUME_PRODUCT"] < VOLUME_CUTOFF_KG
for col, val in anonymisation_values.items():
df.loc[is_small, col] = val
rows_before = len(df)
df = consolidate(df, NUMERIC_COLUMNS)
# print a report
rows_after = len(df)
p = (rows_before - rows_after) / rows_before
tqdm.write(f"Rows reduced by {100 * p:.2f}%")
# upload result
write_csv_for_upload(df, output_key)
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.ref("seipcs_brazil_beef_2015")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})