Skip to content

Cd Disaggregated Beef 2023 New

s3://trase-storage/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_2023_NEW.parquet

Dbt path: trase_production.main_brazil.cd_disaggregated_beef_2023_new

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/_schema_cd_disaggregated_beef.yml

Model file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/cd_disaggregated_beef_2023_new.py

Calls script: trase/data/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_201X_NEW.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: beef, brazil, cd, disaggregated, trade


cd_disaggregated_beef_2023_new

Description

No description


Details

Column Type Description
year BIGINT
hs6 VARCHAR
hs4 VARCHAR
exporter_name VARCHAR
exporter_cnpj VARCHAR
state_of_production VARCHAR
exporter_geocode VARCHAR
country VARCHAR
importer_name VARCHAR
port VARCHAR
cnpj8 VARCHAR
parent_cnpj8 VARCHAR
exporter_state VARCHAR
cnpj_is_valid VARCHAR
vol DOUBLE
fob DOUBLE
cwe DOUBLE

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.2020-12-19-beef_exporter_subsidiaries
  • model.trase_duckdb.cd_disaggregated_beef_2023
  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.postgres_commodity_equivalence_factors
  • model.trase_duckdb.uf_new

Sources

  • ['trase-storage-raw', '2020-12-19-beef_exporter_subsidiaries']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf

from trase.models.brazil.beef.constants import (
    UNKNOWN_STATE_NAME,
    UNKNOWN_MUNICIPALITY_GEOCODE,
)
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.etl.utilities import consolidate

YEARS = [2018, 2019, 2020]
BEEF_HS6 = [
    "010221",  # Cattle; live, pure-bred breeding animals
    "010229",  # Cattle; live, other than pure-bred breeding animals
    "010290",  # Bovine animals; live, other than cattle and buffalo
    "020120",  # Meat; of bovine animals, cuts with bone in, fresh or chilled
    "020130",  # Meat; of bovine animals, boneless cuts, fresh or chilled
    "020220",  # Meat; of bovine animals, cuts with bone in, frozen
    "020230",  # Meat; of bovine animals, boneless cuts, frozen
    "020610",  # Offal, edible; of bovine animals, fresh or chilled
    "020621",  # Offal, edible; of bovine animals, tongues, frozen
    "020622",  # Offal, edible; of bovine animals, livers, frozen
    "020629",  # Offal, edible; of bovine animals, (other than tongues & livers), frozen
    "021020",  # Meat; salted, in brine, dried or smoked, of bovine animals
    "160250",  # Meat preparations; of bovine animals, meat or offal, prepared/preserved
]
BEEF_HS4 = [
    "0102",  # Bovine animals; live
    "0201",  # Meat of bovine animals; fresh or chilled
    "0202",  # Meat of bovine animals; frozen
    "0206",  # Edible offal of bovine + other animals; fresh, chilled or frozen
    "0210",  # Meat and edible meat offal; salted/brine/etc. (does not exist in BoL)
    "0504",  # Guts, bladders and stomachs of animals (does not exist in BoL)
    "1602",  # Prepared or preserved meat, meat offal or blood
]
LIVE_BOVINE_ANIMALS_HS4 = "0102"
UNKNOWN_HS_CODE = "UNKNOWN"


def read_customs_declarations(year):
    if year == 2018:
        df1 = get_pandas_df_once(
            f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_01.csv",
            sep=";",
            dtype=str,
            keep_default_na=False,
        )
        df2 = get_pandas_df_once(
            f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_02.csv",
            sep=";",
            dtype=str,
            keep_default_na=False,
        )
        return df1, df2
    else:
        df = get_pandas_df_once(
            f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}.csv",
            sep=";",
            dtype=str,
            keep_default_na=False,
        )
        return (df,)


def process_customs_declarations(dfs, year):
    if year == 2018:
        df1, df2 = dfs
        df1 = df1.rename(columns={"index_cd": "index_cd_bol"}, errors="raise")
        df2 = df2.rename(columns={"index_cd": "index_cd_bol"}, errors="raise")
        df = pd.concat([df1, df2], sort=False)
    else:
        (df,) = dfs

    df = df.rename(
        columns={
            "exporter.label": "exporter_name",
            "exporter.cnpj": "exporter_cnpj",
            "exporter.municipality.trase_id": "exporter_municipality",
            "country_of_destination.name": "country",
            "importer.label": "importer_name",
            "port_of_export.name": "port",
        },
        errors="raise",
    )

    def assert_valid_hs_codes(series, digits: int):
        valid_code = series.str.isdigit() & (series.str.len() == digits)
        unknown = series == UNKNOWN_HS_CODE
        assert all(valid_code | unknown)

    if "hs8" in df:
        df["hs8"] = df["hs8"].replace("XXXXXXXX", UNKNOWN_HS_CODE)
        assert_valid_hs_codes(df["hs8"], 8)

    df["hs6"] = df["hs6"].replace("XXXXXX", UNKNOWN_HS_CODE)
    df["hs5"] = df["hs5"].replace("XXXXX", UNKNOWN_HS_CODE)
    assert_valid_hs_codes(df["hs6"], 6)
    assert_valid_hs_codes(df["hs5"], 5)
    assert_valid_hs_codes(df["hs4"], 4)

    return df


def process_relationships(df_relationship):
    df_relationship = df_relationship.rename(
        columns={
            "PARENT_CNPJ8": "parent_cnpj8",
            "SUBSID_CNPJ8": "subsid_cnpj8",
            "START_YEAR": "start_year",
            "END_YEAR": "end_year",
        },
        errors="raise",
    )
    df_relationship["start_year"] = np.where(
        df_relationship.start_year == "NA", min(YEARS), df_relationship.start_year
    )
    df_relationship["end_year"] = np.where(
        df_relationship.end_year == "NA", max(YEARS), df_relationship.end_year
    )
    df_relationship.loc[
        df_relationship["PARENT_CLEAN"] == "MARFRIG GLOBAL FOODS", "parent_cnpj8"
    ] = "03853896"
    df_relationship.loc[
        df_relationship["PARENT_CLEAN"]
        == "J.N.J. COMERCIAL IMPORTADORA E EXPORTADORA DE CARNES",
        "parent_cnpj8",
    ] = "07664941"

    assert all(df_relationship["parent_cnpj8"].str.len() == 8)
    assert all(df_relationship["subsid_cnpj8"].str.len() == 8)
    df_relationship = df_relationship.drop_duplicates()

    return df_relationship


def report(df, msg=""):
    """
    Simple report about row count, total vol and fob
    """
    print(f"Report" + (f": {msg}" if msg else ""))
    print(f"\t\t | Row Count: {len(df):,}")
    print(f"\t\t | Sum of vol: " f"" f"{df['vol'].sum():,.0f}")
    print(f"\t\t | Sum of fob: " f"" f"{df['fob'].sum():,.0f}")


def validate_cnpj_code(code):
    # first deduce what the type is given the length of string
    # that it appeared in the original data
    if code == "NA":
        code = "0"

    cnpj = code.rjust(14, "0")
    is_valid_cnpj = stdnum.br.cnpj.is_valid(cnpj)

    cpf = code.rjust(11, "0")
    is_valid_cpf = stdnum.br.cpf.is_valid(cpf)

    # we use the checksum method if it was unequivocal and fall back on the
    # string-length method otherwise
    if is_valid_cnpj and not is_valid_cpf:
        return "valid", cnpj
    elif is_valid_cpf and not is_valid_cnpj:
        return "valid", cpf
    else:
        return "invalid", cnpj if len(code) > 11 else "invalid", cpf


def remove_abnormal_value(df):
    df["vol"] = df["vol"].astype(float)
    df["fob"] = df["fob"].astype(float)
    df.loc[df["fob"] == np.inf, "fob"] = 99999999.0

    # Drop rows with vol == 0
    df = df.drop(df.loc[df["vol"] == 0].index)
    report(df, "Drop vol=0")

    # Validate cnpj and add CNPJ8 number
    df["cnpj_is_valid"], df["exporter_cnpj"] = zip(
        *df.exporter_cnpj.apply(validate_cnpj_code)
    )
    df["cnpj8"] = df["exporter_cnpj"].str[:8]

    return df


def clean_country_names(df, df_countries):
    """
    Standardises country names in a DataFrame by replacing synonyms with their canonical
    names.

    This function matches entries in the 'country' column of the input DataFrame `df` to
    known synonyms listed in `df_countries`, and replaces them with the corresponding
    canonical 'country_name'.

    Args:
        df (pd.DataFrame): The main DataFrame containing a 'country' column with country
            synonyms.
        df_countries (pd.DataFrame): A reference DataFrame with 'country_name' and a
            list of 'synonyms' for each country.

    Returns:
        pd.Series: the "country" column of the dataframe, but standardised to canonical
            'country_name' values.

    Raises:
        AssertionError: If any 'country' value in `df` does not match a synonym in
            `df_countries`.
    """

    # Expand the list of synonyms so that each synonym appears in its own row
    df_country_synonyms = df_countries.explode("synonyms")

    # Keep only relevant columns and remove any duplicate synonym-country pairs
    df_country_synonyms = df_country_synonyms[
        ["country_name", "synonyms"]
    ].drop_duplicates()

    # Rename the 'synonyms' column to 'country' to match the column in df for joining
    df_country_synonyms = df_country_synonyms.rename(columns={"synonyms": "country"})

    # Merge df with the synonym-to-canonical-country mapping on the 'country' column
    df = pd.merge(
        df,
        df_country_synonyms,
        on="country",
        how="left",  # Left join to keep all original df rows
        validate="many_to_one",  # Each country synonym maps to at most one country_name
        indicator=True,  # Add merge status column for validation
    )

    # Ensure all rows had a matching synonym in the reference dataframe
    assert all(df.pop("_merge") == "both"), "Some countries did not match any synonym."

    # Replace 'country' column values with their canonical names
    return df.pop("country_name")


def add_state(df, df_state, df_countries):
    # Add in the exporting CNPJ's state
    df.loc[df["exporter_geocode"] == "XXXXXXX", "exporter_geocode"] = df.loc[
        df["exporter_geocode"] == "XXXXXXX"
    ]["exporter_municipality"].str[-7:]
    df["uf_code"] = df["exporter_geocode"].str[:2]

    df_state["uf_code"] = df_state["uf_code"].astype(str)
    df = df.merge(df_state, on="uf_code", how="left", validate="many_to_one")
    report(df, "Merge with df_state")

    df.rename(columns={"state_name": "exporter_state"}, inplace=True)
    df.drop(columns=["uf_code", "uf_name", "trase_id"], inplace=True, axis=1)
    df["exporter_state"] = np.where(
        df["exporter_state"].isna(), UNKNOWN_STATE_NAME, df["exporter_state"]
    )

    # Replace 0 geocodes with unknown geocode and state
    df["exporter_geocode"] = np.where(
        df["exporter_geocode"].isin(["0", "9999999", "NA"]),
        UNKNOWN_MUNICIPALITY_GEOCODE,
        df["exporter_geocode"],
    )
    df["state_of_production"] = np.where(
        df["state_of_production"] == "UNKNOWN",
        UNKNOWN_STATE_NAME,
        df["state_of_production"],
    )
    df["country"] = clean_country_names(df, df_countries)

    return df


def filter_to_rows_for_beef(df, year):
    # All HS6 codes values should be a six-digit code or UNKNOWN
    hs6 = df["hs6"]
    hs6_is_unknown = hs6 == "UNKNOWN"
    hs6_is_six_digits = (hs6.str.len() == 6) & hs6.str.isdigit()
    assert all(hs6_is_six_digits | hs6_is_unknown)

    # All HS4 codes values should be a six-digit code or UNKNOWN
    hs4 = df["hs4"]
    hs4_is_unknown = hs4 == "UNKNOWN"
    hs4_is_four_digits = (hs4.str.len() == 4) & hs4.str.isdigit()
    assert all(hs4_is_four_digits | hs4_is_unknown)

    # now we filter to beef based on HS6
    #
    # We have an issue with live cattle (0102) since the HS6 is unknown in the customs
    # data post-2018. To fix this, also include HS4 code 0102 when the HS6 is unknown.
    # This is a bit problematic since this HS4 code also includes buffalo (01023) and
    # bovine animals that are neither cattle nor buffaflo (010290). Practically, if we
    # look at MDIC (Port) we can see that the only big issue are non-cattle exports in
    # 2018, since they make up 13% of live bovine animals:
    #
    #     year  |   hs5 | mdic-port vol |   %
    #     ------|-------|---------------|-----
    #     2018  | 01022 |   201,000,000 |  87
    #     2018  | 01029 |    30,400,000 |  13  <--- non-cattle
    #     ------|-------|---------------|-----
    #     2019  | 01022 |   156,000,000 | 100
    #     2019  | 01023 |         5,520 |   0  <--- buffalo, insignificant
    #     ------|-------|---------------|-----
    #     2019  | 01029 |    22,800,000 |  17
    #     2020  | 01022 |   110,000,000 |  83
    #     2020  | 01023 |           133 |   0  <--- buffalo, insignificant
    #
    if year == 2018:
        accept = df["hs4"].isin(BEEF_HS4)
    else:
        accept = df["hs6"].isin(BEEF_HS6)

    if year >= 2018:
        hs4_is_live_cattle = df["hs4"] == LIVE_BOVINE_ANIMALS_HS4
        accept |= hs6_is_unknown & hs4_is_live_cattle

    df = df[accept]
    report(df, "Filter to beef")
    return df


def concat(dfs, *args, sort=False, ignore_index=True, **kwargs):
    """Some useful defaults for concatenating two dataframes together.

    In particular:

    - Do not sort the dataframes.
    - Ignore indexes: we assume the indexes carry not useful information.
    - Validate that all dataframes have the same columns. Because NaNs are really
        annoying in Pandas.
    """
    dfs = list(dfs)
    columns = set(tuple(sorted(df.columns)) for df in dfs)
    if len(columns) != 1:
        raise ValueError("Some dataframes have different columns to others")
    return pd.concat(dfs, *args, sort=sort, ignore_index=ignore_index, **kwargs)


def calculate_carcass_weight_equivalent_volume(df, df_eq):
    def lookup_factors(df, column):
        # get unique equivalence factors for this column
        df_eq_ = df_eq[[column, "eq_factor"]].drop_duplicates()
        duplicated = df_eq_.duplicated(column, keep=False)
        df_eq_ = df_eq_[~duplicated]

        if column == "hs4":
            # When there is only hs4 listed, we use the lower eq_factor for a more conservative estimation
            # For 1602, 2.484913035 is correct
            hs4_row = pd.DataFrame(
                {
                    "hs4": ["0201", "0202", "1602"],
                    "eq_factor": [1.064962726, 1.064962726, 2.484913035],
                }
            )
            df_eq_ = df_eq_.append(hs4_row, ignore_index=True)

        # merge in equivalence factors
        df = pd.merge(
            df,
            df_eq_,
            on=column,
            validate="many_to_one",
            how="left",
            indicator=True,
        )

        # split the dataframe into two based on whether or not we found an equivalence
        # factor
        success = df.pop("_merge") == "both"
        df_remaining = df[~success].drop(columns=["eq_factor"], errors="raise")
        return df[success], df_remaining

    # lookup equivalence factors. Factors are defined per HS6, but unfortunately we
    # don't always have HS6 at our disposal. In that case, we use HS5 or HS4, but only
    # for the codes where there is a single unique equivalence factor
    original_index = df.index
    original_volume = df["vol"].sum()
    df["i"] = original_index

    df6, df_remaining = lookup_factors(df, "hs6")
    df5, df_remaining = lookup_factors(df_remaining, "hs5")
    df4, df_remaining = lookup_factors(df_remaining, "hs4")
    assert df_remaining.empty

    # combine all dataframes and check that we didn't lose or duplicate any data
    df = concat([df6, df5, df4])
    assert sorted(df.pop("i")) == sorted(original_index)
    assert np.isclose(df["vol"].sum(), original_volume, atol=0.001)

    # Calculate carcass weight equivalent
    df["cwe"] = df["vol"] * df["eq_factor"]
    report(df, "Calculate carcass weight equivalent volume")
    return df


def get_parental_cnpj(df, df_relationship, year):
    df_relationship = df_relationship[
        (df_relationship.start_year.astype(int) <= year)
        & (df_relationship.end_year.astype(int) >= year)
    ]

    # remove irrelevant CNPJ8s from the relationships file before merging
    # this allows us to use the "many_to_one" validation in the second merge
    cnpj8 = df["cnpj8"].drop_duplicates().rename("subsid_cnpj8")
    df_relationship = pd.merge(cnpj8, df_relationship)
    df_relationship = df_relationship[~df_relationship["subsid_cnpj8"].duplicated()]

    df = pd.merge(
        df,
        df_relationship,
        left_on="cnpj8",
        right_on="subsid_cnpj8",
        how="left",
        validate="many_to_one",
    )
    df["parent_cnpj8"] = df["parent_cnpj8"].mask(df["parent_cnpj8"].isna(), df["cnpj8"])
    return df


def consolidate_df(df, numerical_columns):
    old_columns = [
        "year",
        "hs6",
        "hs4",
        "exporter_name",
        "exporter_cnpj",
        "exporter_municipality",
        "state_of_production",
        "exporter_geocode",
        "country",
        "vol",
        "fob",
        "importer_name",
        "port",
    ]
    extra_columns = [
        "cnpj8",
        "parent_cnpj8",
        "fob",
        "exporter_state",
        "cwe",
        "cnpj_is_valid",
    ]
    keep_columns = old_columns + extra_columns
    keep_columns.remove("exporter_municipality")
    categorical_columns = [c for c in keep_columns if c not in numerical_columns]
    df = consolidate(df, numerical_columns, categorical_columns)
    report(df, "Consolidate over " + "/".join(categorical_columns))
    return df


def consolidate_and_assert(df):
    df = consolidate_df(df, ["vol", "fob", "cwe"])

    assert len(df[df["exporter_geocode"] == "NA"]) == 0
    assert len(df[df["state_of_production"] == "0"]) == 0
    assert len(df[df["exporter_state"] == "0"]) == 0
    return df


def process_beef_carcass_weight_equivalence_factors(df_eq):
    df_eq = df_eq[df_eq["commodity_equivalence_group_name"] == "BEEF"]
    df_eq = df_eq[df_eq["node_id"] == 27]  # Brazil
    df_eq = df_eq[df_eq["commodity_code"].str.len() == 6]
    df_eq["hs6"] = df_eq["commodity_code"]
    df_eq["hs5"] = df_eq["commodity_code"].str.slice(0, 5)
    df_eq["hs4"] = df_eq["commodity_code"].str.slice(0, 4)
    return df_eq[["hs4", "hs5", "hs6", "eq_factor"]].drop_duplicates()


def main():
    for year in YEARS:
        S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()

        df_state = get_pandas_df_once(
            "brazil/metadata/UF_NEW.csv",
            sep=";",
            dtype=str,
            keep_default_na=False,
        )
        df_relationship = get_pandas_df_once(
            "brazil/logistics/company_relationships/2020-12-19-BEEF_EXPORTER_SUBSIDIARIES.csv",
            sep=",",
            dtype=str,
            keep_default_na=False,
        )
        df_eq = read_s3_parquet(
            "postgres_views/postgres_commodity_equivalence_factors.parquet"
        )
        dfs = read_customs_declarations(year)
        df_countries = read_s3_parquet("postgres_views/postgres_countries.parquet")

        df = process_data(dfs, df_countries, df_eq, df_relationship, df_state, year)

        write_csv_for_upload(
            df,
            f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_NEW.csv",
            sep=";",
        )


def process_data(dfs, df_countries, df_eq, df_relationship, df_state, year):
    df_relationship = process_relationships(df_relationship)
    df_eq = process_beef_carcass_weight_equivalence_factors(df_eq)
    df = process_customs_declarations(dfs, year)
    df = remove_abnormal_value(df)
    df = filter_to_rows_for_beef(df, year)
    df = calculate_carcass_weight_equivalent_volume(df, df_eq)
    df = add_state(df, df_state, df_countries)
    df = get_parental_cnpj(df, df_relationship, year)
    df = consolidate_and_assert(df)
    return df


if __name__ == "__main__":
    main()
from trase.data.brazil.beef.trade.cd.disaggregated.CD_DISAGGREGATED_BEEF_201X_NEW import (
    process_data,
)

YEAR = 2023


def model(dbt, cursor):
    dbt.config(materialized="external")

    df = dbt.ref("cd_disaggregated_beef_2023").df()
    df_countries = dbt.ref("postgres_countries").df()
    df_eq = dbt.ref("postgres_commodity_equivalence_factors").df()
    df_relationship = dbt.source(
        "trase-storage-raw", "2020-12-19-beef_exporter_subsidiaries"
    ).df()
    df_state = dbt.ref("uf_new").df()

    return process_data([df], df_countries, df_eq, df_relationship, df_state, year=YEAR)