Cd Disaggregated Beef 2023 New
s3://trase-storage/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_2023_NEW.parquet
Dbt path: trase_production.main_brazil.cd_disaggregated_beef_2023_new
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/_schema_cd_disaggregated_beef.yml
Model file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/cd_disaggregated_beef_2023_new.py
Calls script: trase/data/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_201X_NEW.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: beef, brazil, cd, disaggregated, trade
cd_disaggregated_beef_2023_new
Description
No description
Details
| Column | Type | Description |
|---|---|---|
year |
BIGINT |
|
hs6 |
VARCHAR |
|
hs4 |
VARCHAR |
|
exporter_name |
VARCHAR |
|
exporter_cnpj |
VARCHAR |
|
state_of_production |
VARCHAR |
|
exporter_geocode |
VARCHAR |
|
country |
VARCHAR |
|
importer_name |
VARCHAR |
|
port |
VARCHAR |
|
cnpj8 |
VARCHAR |
|
parent_cnpj8 |
VARCHAR |
|
exporter_state |
VARCHAR |
|
cnpj_is_valid |
VARCHAR |
|
vol |
DOUBLE |
|
fob |
DOUBLE |
|
cwe |
DOUBLE |
Models / Seeds
source.trase_duckdb.trase-storage-raw.2020-12-19-beef_exporter_subsidiariesmodel.trase_duckdb.cd_disaggregated_beef_2023model.trase_duckdb.postgres_countriesmodel.trase_duckdb.postgres_commodity_equivalence_factorsmodel.trase_duckdb.uf_new
Sources
['trase-storage-raw', '2020-12-19-beef_exporter_subsidiaries']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from trase.models.brazil.beef.constants import (
UNKNOWN_STATE_NAME,
UNKNOWN_MUNICIPALITY_GEOCODE,
)
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.etl.utilities import consolidate
YEARS = [2018, 2019, 2020]
BEEF_HS6 = [
"010221", # Cattle; live, pure-bred breeding animals
"010229", # Cattle; live, other than pure-bred breeding animals
"010290", # Bovine animals; live, other than cattle and buffalo
"020120", # Meat; of bovine animals, cuts with bone in, fresh or chilled
"020130", # Meat; of bovine animals, boneless cuts, fresh or chilled
"020220", # Meat; of bovine animals, cuts with bone in, frozen
"020230", # Meat; of bovine animals, boneless cuts, frozen
"020610", # Offal, edible; of bovine animals, fresh or chilled
"020621", # Offal, edible; of bovine animals, tongues, frozen
"020622", # Offal, edible; of bovine animals, livers, frozen
"020629", # Offal, edible; of bovine animals, (other than tongues & livers), frozen
"021020", # Meat; salted, in brine, dried or smoked, of bovine animals
"160250", # Meat preparations; of bovine animals, meat or offal, prepared/preserved
]
BEEF_HS4 = [
"0102", # Bovine animals; live
"0201", # Meat of bovine animals; fresh or chilled
"0202", # Meat of bovine animals; frozen
"0206", # Edible offal of bovine + other animals; fresh, chilled or frozen
"0210", # Meat and edible meat offal; salted/brine/etc. (does not exist in BoL)
"0504", # Guts, bladders and stomachs of animals (does not exist in BoL)
"1602", # Prepared or preserved meat, meat offal or blood
]
LIVE_BOVINE_ANIMALS_HS4 = "0102"
UNKNOWN_HS_CODE = "UNKNOWN"
def read_customs_declarations(year):
if year == 2018:
df1 = get_pandas_df_once(
f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_01.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
df2 = get_pandas_df_once(
f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_02.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
return df1, df2
else:
df = get_pandas_df_once(
f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
return (df,)
def process_customs_declarations(dfs, year):
if year == 2018:
df1, df2 = dfs
df1 = df1.rename(columns={"index_cd": "index_cd_bol"}, errors="raise")
df2 = df2.rename(columns={"index_cd": "index_cd_bol"}, errors="raise")
df = pd.concat([df1, df2], sort=False)
else:
(df,) = dfs
df = df.rename(
columns={
"exporter.label": "exporter_name",
"exporter.cnpj": "exporter_cnpj",
"exporter.municipality.trase_id": "exporter_municipality",
"country_of_destination.name": "country",
"importer.label": "importer_name",
"port_of_export.name": "port",
},
errors="raise",
)
def assert_valid_hs_codes(series, digits: int):
valid_code = series.str.isdigit() & (series.str.len() == digits)
unknown = series == UNKNOWN_HS_CODE
assert all(valid_code | unknown)
if "hs8" in df:
df["hs8"] = df["hs8"].replace("XXXXXXXX", UNKNOWN_HS_CODE)
assert_valid_hs_codes(df["hs8"], 8)
df["hs6"] = df["hs6"].replace("XXXXXX", UNKNOWN_HS_CODE)
df["hs5"] = df["hs5"].replace("XXXXX", UNKNOWN_HS_CODE)
assert_valid_hs_codes(df["hs6"], 6)
assert_valid_hs_codes(df["hs5"], 5)
assert_valid_hs_codes(df["hs4"], 4)
return df
def process_relationships(df_relationship):
df_relationship = df_relationship.rename(
columns={
"PARENT_CNPJ8": "parent_cnpj8",
"SUBSID_CNPJ8": "subsid_cnpj8",
"START_YEAR": "start_year",
"END_YEAR": "end_year",
},
errors="raise",
)
df_relationship["start_year"] = np.where(
df_relationship.start_year == "NA", min(YEARS), df_relationship.start_year
)
df_relationship["end_year"] = np.where(
df_relationship.end_year == "NA", max(YEARS), df_relationship.end_year
)
df_relationship.loc[
df_relationship["PARENT_CLEAN"] == "MARFRIG GLOBAL FOODS", "parent_cnpj8"
] = "03853896"
df_relationship.loc[
df_relationship["PARENT_CLEAN"]
== "J.N.J. COMERCIAL IMPORTADORA E EXPORTADORA DE CARNES",
"parent_cnpj8",
] = "07664941"
assert all(df_relationship["parent_cnpj8"].str.len() == 8)
assert all(df_relationship["subsid_cnpj8"].str.len() == 8)
df_relationship = df_relationship.drop_duplicates()
return df_relationship
def report(df, msg=""):
"""
Simple report about row count, total vol and fob
"""
print(f"Report" + (f": {msg}" if msg else ""))
print(f"\t\t | Row Count: {len(df):,}")
print(f"\t\t | Sum of vol: " f"" f"{df['vol'].sum():,.0f}")
print(f"\t\t | Sum of fob: " f"" f"{df['fob'].sum():,.0f}")
def validate_cnpj_code(code):
# first deduce what the type is given the length of string
# that it appeared in the original data
if code == "NA":
code = "0"
cnpj = code.rjust(14, "0")
is_valid_cnpj = stdnum.br.cnpj.is_valid(cnpj)
cpf = code.rjust(11, "0")
is_valid_cpf = stdnum.br.cpf.is_valid(cpf)
# we use the checksum method if it was unequivocal and fall back on the
# string-length method otherwise
if is_valid_cnpj and not is_valid_cpf:
return "valid", cnpj
elif is_valid_cpf and not is_valid_cnpj:
return "valid", cpf
else:
return "invalid", cnpj if len(code) > 11 else "invalid", cpf
def remove_abnormal_value(df):
df["vol"] = df["vol"].astype(float)
df["fob"] = df["fob"].astype(float)
df.loc[df["fob"] == np.inf, "fob"] = 99999999.0
# Drop rows with vol == 0
df = df.drop(df.loc[df["vol"] == 0].index)
report(df, "Drop vol=0")
# Validate cnpj and add CNPJ8 number
df["cnpj_is_valid"], df["exporter_cnpj"] = zip(
*df.exporter_cnpj.apply(validate_cnpj_code)
)
df["cnpj8"] = df["exporter_cnpj"].str[:8]
return df
def clean_country_names(df, df_countries):
"""
Standardises country names in a DataFrame by replacing synonyms with their canonical
names.
This function matches entries in the 'country' column of the input DataFrame `df` to
known synonyms listed in `df_countries`, and replaces them with the corresponding
canonical 'country_name'.
Args:
df (pd.DataFrame): The main DataFrame containing a 'country' column with country
synonyms.
df_countries (pd.DataFrame): A reference DataFrame with 'country_name' and a
list of 'synonyms' for each country.
Returns:
pd.Series: the "country" column of the dataframe, but standardised to canonical
'country_name' values.
Raises:
AssertionError: If any 'country' value in `df` does not match a synonym in
`df_countries`.
"""
# Expand the list of synonyms so that each synonym appears in its own row
df_country_synonyms = df_countries.explode("synonyms")
# Keep only relevant columns and remove any duplicate synonym-country pairs
df_country_synonyms = df_country_synonyms[
["country_name", "synonyms"]
].drop_duplicates()
# Rename the 'synonyms' column to 'country' to match the column in df for joining
df_country_synonyms = df_country_synonyms.rename(columns={"synonyms": "country"})
# Merge df with the synonym-to-canonical-country mapping on the 'country' column
df = pd.merge(
df,
df_country_synonyms,
on="country",
how="left", # Left join to keep all original df rows
validate="many_to_one", # Each country synonym maps to at most one country_name
indicator=True, # Add merge status column for validation
)
# Ensure all rows had a matching synonym in the reference dataframe
assert all(df.pop("_merge") == "both"), "Some countries did not match any synonym."
# Replace 'country' column values with their canonical names
return df.pop("country_name")
def add_state(df, df_state, df_countries):
# Add in the exporting CNPJ's state
df.loc[df["exporter_geocode"] == "XXXXXXX", "exporter_geocode"] = df.loc[
df["exporter_geocode"] == "XXXXXXX"
]["exporter_municipality"].str[-7:]
df["uf_code"] = df["exporter_geocode"].str[:2]
df_state["uf_code"] = df_state["uf_code"].astype(str)
df = df.merge(df_state, on="uf_code", how="left", validate="many_to_one")
report(df, "Merge with df_state")
df.rename(columns={"state_name": "exporter_state"}, inplace=True)
df.drop(columns=["uf_code", "uf_name", "trase_id"], inplace=True, axis=1)
df["exporter_state"] = np.where(
df["exporter_state"].isna(), UNKNOWN_STATE_NAME, df["exporter_state"]
)
# Replace 0 geocodes with unknown geocode and state
df["exporter_geocode"] = np.where(
df["exporter_geocode"].isin(["0", "9999999", "NA"]),
UNKNOWN_MUNICIPALITY_GEOCODE,
df["exporter_geocode"],
)
df["state_of_production"] = np.where(
df["state_of_production"] == "UNKNOWN",
UNKNOWN_STATE_NAME,
df["state_of_production"],
)
df["country"] = clean_country_names(df, df_countries)
return df
def filter_to_rows_for_beef(df, year):
# All HS6 codes values should be a six-digit code or UNKNOWN
hs6 = df["hs6"]
hs6_is_unknown = hs6 == "UNKNOWN"
hs6_is_six_digits = (hs6.str.len() == 6) & hs6.str.isdigit()
assert all(hs6_is_six_digits | hs6_is_unknown)
# All HS4 codes values should be a six-digit code or UNKNOWN
hs4 = df["hs4"]
hs4_is_unknown = hs4 == "UNKNOWN"
hs4_is_four_digits = (hs4.str.len() == 4) & hs4.str.isdigit()
assert all(hs4_is_four_digits | hs4_is_unknown)
# now we filter to beef based on HS6
#
# We have an issue with live cattle (0102) since the HS6 is unknown in the customs
# data post-2018. To fix this, also include HS4 code 0102 when the HS6 is unknown.
# This is a bit problematic since this HS4 code also includes buffalo (01023) and
# bovine animals that are neither cattle nor buffaflo (010290). Practically, if we
# look at MDIC (Port) we can see that the only big issue are non-cattle exports in
# 2018, since they make up 13% of live bovine animals:
#
# year | hs5 | mdic-port vol | %
# ------|-------|---------------|-----
# 2018 | 01022 | 201,000,000 | 87
# 2018 | 01029 | 30,400,000 | 13 <--- non-cattle
# ------|-------|---------------|-----
# 2019 | 01022 | 156,000,000 | 100
# 2019 | 01023 | 5,520 | 0 <--- buffalo, insignificant
# ------|-------|---------------|-----
# 2019 | 01029 | 22,800,000 | 17
# 2020 | 01022 | 110,000,000 | 83
# 2020 | 01023 | 133 | 0 <--- buffalo, insignificant
#
if year == 2018:
accept = df["hs4"].isin(BEEF_HS4)
else:
accept = df["hs6"].isin(BEEF_HS6)
if year >= 2018:
hs4_is_live_cattle = df["hs4"] == LIVE_BOVINE_ANIMALS_HS4
accept |= hs6_is_unknown & hs4_is_live_cattle
df = df[accept]
report(df, "Filter to beef")
return df
def concat(dfs, *args, sort=False, ignore_index=True, **kwargs):
"""Some useful defaults for concatenating two dataframes together.
In particular:
- Do not sort the dataframes.
- Ignore indexes: we assume the indexes carry not useful information.
- Validate that all dataframes have the same columns. Because NaNs are really
annoying in Pandas.
"""
dfs = list(dfs)
columns = set(tuple(sorted(df.columns)) for df in dfs)
if len(columns) != 1:
raise ValueError("Some dataframes have different columns to others")
return pd.concat(dfs, *args, sort=sort, ignore_index=ignore_index, **kwargs)
def calculate_carcass_weight_equivalent_volume(df, df_eq):
def lookup_factors(df, column):
# get unique equivalence factors for this column
df_eq_ = df_eq[[column, "eq_factor"]].drop_duplicates()
duplicated = df_eq_.duplicated(column, keep=False)
df_eq_ = df_eq_[~duplicated]
if column == "hs4":
# When there is only hs4 listed, we use the lower eq_factor for a more conservative estimation
# For 1602, 2.484913035 is correct
hs4_row = pd.DataFrame(
{
"hs4": ["0201", "0202", "1602"],
"eq_factor": [1.064962726, 1.064962726, 2.484913035],
}
)
df_eq_ = df_eq_.append(hs4_row, ignore_index=True)
# merge in equivalence factors
df = pd.merge(
df,
df_eq_,
on=column,
validate="many_to_one",
how="left",
indicator=True,
)
# split the dataframe into two based on whether or not we found an equivalence
# factor
success = df.pop("_merge") == "both"
df_remaining = df[~success].drop(columns=["eq_factor"], errors="raise")
return df[success], df_remaining
# lookup equivalence factors. Factors are defined per HS6, but unfortunately we
# don't always have HS6 at our disposal. In that case, we use HS5 or HS4, but only
# for the codes where there is a single unique equivalence factor
original_index = df.index
original_volume = df["vol"].sum()
df["i"] = original_index
df6, df_remaining = lookup_factors(df, "hs6")
df5, df_remaining = lookup_factors(df_remaining, "hs5")
df4, df_remaining = lookup_factors(df_remaining, "hs4")
assert df_remaining.empty
# combine all dataframes and check that we didn't lose or duplicate any data
df = concat([df6, df5, df4])
assert sorted(df.pop("i")) == sorted(original_index)
assert np.isclose(df["vol"].sum(), original_volume, atol=0.001)
# Calculate carcass weight equivalent
df["cwe"] = df["vol"] * df["eq_factor"]
report(df, "Calculate carcass weight equivalent volume")
return df
def get_parental_cnpj(df, df_relationship, year):
df_relationship = df_relationship[
(df_relationship.start_year.astype(int) <= year)
& (df_relationship.end_year.astype(int) >= year)
]
# remove irrelevant CNPJ8s from the relationships file before merging
# this allows us to use the "many_to_one" validation in the second merge
cnpj8 = df["cnpj8"].drop_duplicates().rename("subsid_cnpj8")
df_relationship = pd.merge(cnpj8, df_relationship)
df_relationship = df_relationship[~df_relationship["subsid_cnpj8"].duplicated()]
df = pd.merge(
df,
df_relationship,
left_on="cnpj8",
right_on="subsid_cnpj8",
how="left",
validate="many_to_one",
)
df["parent_cnpj8"] = df["parent_cnpj8"].mask(df["parent_cnpj8"].isna(), df["cnpj8"])
return df
def consolidate_df(df, numerical_columns):
old_columns = [
"year",
"hs6",
"hs4",
"exporter_name",
"exporter_cnpj",
"exporter_municipality",
"state_of_production",
"exporter_geocode",
"country",
"vol",
"fob",
"importer_name",
"port",
]
extra_columns = [
"cnpj8",
"parent_cnpj8",
"fob",
"exporter_state",
"cwe",
"cnpj_is_valid",
]
keep_columns = old_columns + extra_columns
keep_columns.remove("exporter_municipality")
categorical_columns = [c for c in keep_columns if c not in numerical_columns]
df = consolidate(df, numerical_columns, categorical_columns)
report(df, "Consolidate over " + "/".join(categorical_columns))
return df
def consolidate_and_assert(df):
df = consolidate_df(df, ["vol", "fob", "cwe"])
assert len(df[df["exporter_geocode"] == "NA"]) == 0
assert len(df[df["state_of_production"] == "0"]) == 0
assert len(df[df["exporter_state"] == "0"]) == 0
return df
def process_beef_carcass_weight_equivalence_factors(df_eq):
df_eq = df_eq[df_eq["commodity_equivalence_group_name"] == "BEEF"]
df_eq = df_eq[df_eq["node_id"] == 27] # Brazil
df_eq = df_eq[df_eq["commodity_code"].str.len() == 6]
df_eq["hs6"] = df_eq["commodity_code"]
df_eq["hs5"] = df_eq["commodity_code"].str.slice(0, 5)
df_eq["hs4"] = df_eq["commodity_code"].str.slice(0, 4)
return df_eq[["hs4", "hs5", "hs6", "eq_factor"]].drop_duplicates()
def main():
for year in YEARS:
S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
df_state = get_pandas_df_once(
"brazil/metadata/UF_NEW.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
df_relationship = get_pandas_df_once(
"brazil/logistics/company_relationships/2020-12-19-BEEF_EXPORTER_SUBSIDIARIES.csv",
sep=",",
dtype=str,
keep_default_na=False,
)
df_eq = read_s3_parquet(
"postgres_views/postgres_commodity_equivalence_factors.parquet"
)
dfs = read_customs_declarations(year)
df_countries = read_s3_parquet("postgres_views/postgres_countries.parquet")
df = process_data(dfs, df_countries, df_eq, df_relationship, df_state, year)
write_csv_for_upload(
df,
f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_NEW.csv",
sep=";",
)
def process_data(dfs, df_countries, df_eq, df_relationship, df_state, year):
df_relationship = process_relationships(df_relationship)
df_eq = process_beef_carcass_weight_equivalence_factors(df_eq)
df = process_customs_declarations(dfs, year)
df = remove_abnormal_value(df)
df = filter_to_rows_for_beef(df, year)
df = calculate_carcass_weight_equivalent_volume(df, df_eq)
df = add_state(df, df_state, df_countries)
df = get_parental_cnpj(df, df_relationship, year)
df = consolidate_and_assert(df)
return df
if __name__ == "__main__":
main()
from trase.data.brazil.beef.trade.cd.disaggregated.CD_DISAGGREGATED_BEEF_201X_NEW import (
process_data,
)
YEAR = 2023
def model(dbt, cursor):
dbt.config(materialized="external")
df = dbt.ref("cd_disaggregated_beef_2023").df()
df_countries = dbt.ref("postgres_countries").df()
df_eq = dbt.ref("postgres_commodity_equivalence_factors").df()
df_relationship = dbt.source(
"trase-storage-raw", "2020-12-19-beef_exporter_subsidiaries"
).df()
df_state = dbt.ref("uf_new").df()
return process_data([df], df_countries, df_eq, df_relationship, df_state, year=YEAR)