Skip to content

Seipcs Brazil Beef 2014 Patched

s3://trase-storage/brazil/beef/sei_pcs/v2.2.1/SEIPCS_BRAZIL_BEEF_2014.csv

Dbt path: trase_production.main_brazil.seipcs_brazil_beef_2014_patched

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/_schema_sei_pcs_v2_2_1.yml

Model file link: trase/data_pipeline/models/brazil/beef/sei_pcs/v2_2_1/seipcs_brazil_beef_2014_patched.py

Calls script: trase/data/brazil/beef/sei_pcs/v2_2_1/SEIPCS_BRAZIL_BEEF_20XX.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, beef, brazil, sei_pcs, v2.2.1, 2014


seipcs_brazil_beef_2014_patched

Description

No description


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.seipcs_brazil_beef_2014

Sources

  • ['trase-storage-raw', 'seipcs_brazil_beef_2014']
"""
In the v2.2.1 model we modified the anonymisation step (PR: #4982) in two ways:

    - The cutoff threshold was raised from 250kg to 500kg
    - We additionally anonymised logistics hubs etc.

This new method was used in all of the newer years (2021+). But due to time pressure,
rather than re-running the model for the old years we just patch the files in this
script.
"""

from pprint import pprint

from tqdm import tqdm

from trase.models.brazil.beef.definition import flows_export
from trase.models.brazil.beef.model import ANONYMISATION_VALUES, VOLUME_CUTOFF_KG
from trase.tools.aws import get_pandas_df
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.etl.utilities import consolidate

DO_NOT_ANONYMISE = [
    "port",
    "exporter_name",
    "exporter_state.state_name",
    "importer_name",
    "country",
    "cwe",
    "vol",
    "fob",
    "hs4",
    "hs6",
    "year",
    "exporter_cnpj",
    "branch",
]

S3_KEYS = [
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2010.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2011.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2012.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2013.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2014.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2015.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2016.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2017.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2018.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2019.csv",
    "brazil/beef/sei_pcs/v2.2.0/SEIPCS_BRAZIL_BEEF_2020.csv",
]
NUMERIC_COLUMNS = ["FOB", "VOLUME_RAW", "VOLUME_PRODUCT"]


def main():
    anonymisation_values = {
        column.header: ANONYMISATION_VALUES[column.flow_attribute]
        for column in flows_export
        if column.flow_attribute not in DO_NOT_ANONYMISE
    }
    print("Using the following values for anonymisation:")
    pprint(anonymisation_values)

    for input_key in tqdm(S3_KEYS):
        output_key = input_key.replace("v2.2.0", "v2.2.1")
        assert input_key != output_key

        # read CSV file as all strings
        df = get_pandas_df(
            input_key,
            sep=";",
            dtype=str,
            na_filter=False,
        )

        # we do however need to cast the numeric columns
        df = df.astype({column: float for column in NUMERIC_COLUMNS})

        # perform anonymisation
        is_small = df["VOLUME_PRODUCT"] < VOLUME_CUTOFF_KG
        for col, val in anonymisation_values.items():
            df.loc[is_small, col] = val
        rows_before = len(df)
        df = consolidate(df, NUMERIC_COLUMNS)

        # print a report
        rows_after = len(df)
        p = (rows_before - rows_after) / rows_before
        tqdm.write(f"Rows reduced by {100 * p:.2f}%")

        # upload result
        write_csv_for_upload(df, output_key)


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "seipcs_brazil_beef_2014")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})