Cd Disaggregated Beef 2022
s3://trase-storage/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_2022.parquet
Dbt path: trase_production.main_brazil.cd_disaggregated_beef_2022
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/_schema_cd_disaggregated_beef.yml
Model file link: trase/data_pipeline/models/brazil/beef/trade/cd/disaggregated/cd_disaggregated_beef_2022.py
Calls script: trase/data/brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_201X.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: beef, brazil, cd, disaggregated, trade
cd_disaggregated_beef_2022
Description
No description
Details
| Column | Type | Description |
|---|---|---|
index_mdic |
VARCHAR |
|
index_bol |
VARCHAR |
|
vol |
VARCHAR |
|
vol_mdic |
VARCHAR |
|
matching_stage |
VARCHAR |
|
state.trase_id |
VARCHAR |
|
message |
VARCHAR |
|
via |
VARCHAR |
|
hs4 |
VARCHAR |
|
hs5 |
VARCHAR |
|
hs6 |
VARCHAR |
|
hs8 |
VARCHAR |
|
exporter.cnpj |
VARCHAR |
|
exporter.label |
VARCHAR |
|
port_of_export.name |
VARCHAR |
|
port_of_export.group |
VARCHAR |
|
exporter.type |
VARCHAR |
|
exporter.municipality.trase_id |
VARCHAR |
|
importer.label |
VARCHAR |
|
country_of_destination.name |
VARCHAR |
|
country_of_destination.trase_id |
VARCHAR |
|
country_of_destination.group |
VARCHAR |
|
matched |
VARCHAR |
|
fob |
VARCHAR |
|
year |
BIGINT |
|
exporter_geocode |
VARCHAR |
|
state_of_production |
VARCHAR |
Models / Seeds
model.trase_duckdb.brazil_bol_2022model.trase_duckdb.brazil_mdic_disaggregated_2022_beefmodel.trase_duckdb.brazil_mdic_port_2022model.trase_duckdb.postgres_regions_without_geometry
import numpy as np
import pandas as pd
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.utilities.helpers import clean_string
from trase.tools.matching.fair_allocation import direct_sourcing
from trase.tools.sei_pcs.pandas_utilities import (
Compare,
compare_dataframes_single,
rename,
)
PORT_GROUPS = {
# for the matching, we consider the following ports to be equivalent
"BELEM": "BARCARENA",
"PECEM": "FORTALEZA", # Pecem is a terminal 60km from Fortaleza
}
COUNTRY_GROUPS = {
# for the matching, we consider the following countries to be equivalent
"CHINA (HONG KONG)": "CHINA",
"CHINA (MAINLAND)": "CHINA",
"CONGO DEMOCRATIC REPUBLIC OF THE": "CONGO",
}
UNKNOWNS = { # the first item is considered to be the "canonical" unknown value
"cnpj": ["0"],
"country_of_destination.label": ["UNKNOWN COUNTRY"],
"country_of_destination.name": ["UNKNOWN COUNTRY"],
"country_of_destination.group": ["UNKNOWN COUNTRY GROUP"],
"country_of_destination.trase_id": ["XX"],
"exporter.cnpj": ["0"],
"exporter.label": ["UNKNOWN COMPANY"],
"exporter.municipality.trase_id": ["BR-XXXXXXX", ""],
"exporter.type": ["UNKNOWN"],
"hs4": ["XXXX"],
"hs5": ["XXXXX"],
"hs6": ["XXXXXX"],
"hs8": ["XXXXXXXX"],
"importer.label": ["UNKNOWN COMPANY"],
"month": [-1],
"port_of_export.name": ["UNKNOWN PORT BRAZIL", "UNKNOWN PORT"],
"port_of_export.group": ["UNKNOWN PORT GROUP BRAZIL"],
"state.trase_id": ["BR-XX"],
"matching_stage": ["UNMATCHED"],
"via": ["XX"],
"message": [""],
"success": ["N/A"],
}
BEEF_HS4 = [
"0102", # Bovine animals; live
"0201", # Meat of bovine animals; fresh or chilled
"0202", # Meat of bovine animals; frozen
"0206", # Edible offal of bovine + other animals; fresh, chilled or frozen
"0210", # Meat and edible meat offal; salted/brine/etc. (does not exist in BoL)
"0504", # Guts, bladders and stomachs of animals (does not exist in BoL)
"1602", # Prepared or preserved meat, meat offal or blood
]
COLUMNS_TO_KEEP_FROM_BOL = [
"via",
"hs4",
"hs5",
"hs6",
"hs8",
"exporter.cnpj",
"exporter.label",
"port_of_export.name",
"port_of_export.group",
"exporter.type",
"exporter.municipality.trase_id",
"importer.label",
"country_of_destination.name",
"country_of_destination.trase_id",
"country_of_destination.group",
]
COLUMNS_TO_KEEP_FROM_MDIC = [
"state.trase_id",
"success",
"message",
]
def add_interpolated_fob_values(df, df_final, year):
# get price (=FOB / volume) from the original MDIC (Port) file
df = df[["hs4", "hs6", "fob", "vol"]]
# abnormal vol can result to extremely high price
df = df[(df["vol"].astype(float) > 1) & (df["fob"].astype(float) > 0)].copy()
df["price"] = df["fob"].astype(np.int64) / df["vol"].astype(np.int64)
df["hs5"] = df["hs6"].str.slice(0, 5)
# get average prices per HS6/5/4
df_hs6 = df.groupby("hs6")["price"].mean().rename("price_hs6").reset_index()
df_hs5 = df.groupby("hs5")["price"].mean().rename("price_hs5").reset_index()
df_hs4 = df.groupby("hs4")["price"].mean().rename("price_hs4").reset_index()
# merge in to df_final
df_final = pd.merge(df_final, df_hs6, on="hs6", how="left")
df_final = pd.merge(df_final, df_hs5, on="hs5", how="left")
df_final = pd.merge(df_final, df_hs4, on="hs4", how="left")
# take the price for HS6 if it exists, else for HS5, else for HS4
price = df_final.pop("price_hs5").combine_first(df_final.pop("price_hs4"))
price = df_final.pop("price_hs6").combine_first(price)
assert not any(price.isna())
# reconstruct FOB as volume * price
return df_final.assign(fob=df_final["vol"] * price)
def replace_nan_with_unknown_value(df_final):
df_final = df_final.fillna(
{column: unknowns[0] for column, unknowns in UNKNOWNS.items()}
)
assert not df_final.isna().any().any()
return df_final
def concat(*dfs):
assert len(set(tuple(sorted(df.columns)) for df in dfs)) == 1
return pd.concat(dfs, sort=False, ignore_index=True)
def pad_with_remaining_bol(df_solved, df_bol_remaining, has_hs8):
columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
if not has_hs8:
columns_to_keep_from_bol.remove("hs8")
df_padding = df_bol_remaining[[*columns_to_keep_from_bol, "vol"]]
df_padding = df_padding.rename(columns={"vol": "vol_bol"})
df_padding = df_padding.assign(
**{column: UNKNOWNS[column][0] for column in COLUMNS_TO_KEEP_FROM_MDIC}
)
df_padding = df_padding.assign(
matched=False,
matching_stage="N/A",
index_mdic=-1,
index_bol=df_padding.index,
vol_mdic=0,
)
df_final = concat(df_solved.assign(matched=True), df_padding)
df_final = replace_nan_with_unknown_value(df_final)
df_final = df_final.rename(columns={"vol_bol": "vol"}, errors="raise")
return df_final
def pad_to_mdic_volumes(df_final, df_bol_remaining_, df_mdic_remaining_):
"""
At this point our dataframe (df_final) is equal to the BoL dataset. In practice the
the total volume is usually less than MDIC. Since MDIC represents official
government statistics, we would like to "pad" our dataframe with rows to bring up
the volume.
These padding rows will of course contain largely unknowns. We try our best to at
least preserve the HS6 column. There is one edge case where we can use the unmatched
rows from MDIC directly.
For simplicity we use the remaining BoL and MDIC under the following assumptions:
df_final = bol matched + bol remaining
df_mdic = mdic matched + mdic remaining
bol matched = mdic matched
And therefore that
df_mdic - df_final = mdic remaining - bol remaining
This assumption doesn't quite hold since (a) volumes in MDIC and BoL are different
and (b) at the time of writing, this function ran after discard_tiny_flows. However
it's close enough!
"""
print("Padding to MDIC volume")
# unknowns assumed to be filled
assert not df_final.isna().any().any()
assert not df_bol_remaining_.isna().any().any()
assert not df_mdic_remaining_.isna().any().any()
# gather all HS codes
df_hs_ = pd.concat(
[
df_final[["hs4", "hs6"]],
df_bol_remaining_[["hs4", "hs6"]],
df_mdic_remaining_[["hs4", "hs6"]],
]
).drop_duplicates()
unknown_hs6 = UNKNOWNS["hs6"][0]
unknown_hs4 = UNKNOWNS["hs4"][0]
padding_dataframes = []
for hs4, df_hs in df_hs_.groupby("hs4"):
if hs4 == unknown_hs4:
continue
df = df_bol_remaining_[df_bol_remaining_["hs4"] == hs4]
# look to see if there is significant volume in an unknown HS6 code. If this
# is the case then padding per HS6 is not really possible to do. For example,
# suppose we have the following:
#
# df_bol_remaining: df_mdic_remaining:
#
# | hs6 | vol | | hs6 | vol |
# |--------|-----| |--------|-----|
# | 0102XX | 50 | | 010200 | 50 |
# | 010299 | 50 |
#
# We know that df_final needs 50 volume, but we don't know which HS6 codes are
# already "part" of the unknown 0102XX code. The best we can do is to add 50
# tons to 0102XX, i.e. pad to an HS4 level
volume_in_unknown_hs6 = df[df["hs6"] == unknown_hs6]["vol"].sum()
total_volume = df["vol"].sum()
p = volume_in_unknown_hs6 / total_volume
if p > 0.1: # i.e. over 10% of total volume
print(f"\tNot padding {hs4} as unknown HS6 is {100 * p:.0f}% of volume")
# TODO consider padding to HS4 level
continue
for hs6 in df_hs["hs6"]:
if hs6 == unknown_hs6:
continue
df_bol_remaining = df_bol_remaining_[df_bol_remaining_["hs6"] == hs6]
df_mdic_remaining = df_mdic_remaining_[df_mdic_remaining_["hs6"] == hs6]
mdic_volume = df_mdic_remaining["vol"].sum()
bol_volume = df_bol_remaining["vol"].sum()
if bol_volume > mdic_volume:
with np.errstate(divide="ignore"):
p = 100 * np.divide(bol_volume - mdic_volume, mdic_volume)
print(f"\tWarning: BoL volume for {hs6} exceeds MDIC by {p:,.1f}%")
continue
# if this HS6 code is entirely missing from df_bol_remaining then we can
# simply pad using the MDIC data
if df_bol_remaining.empty:
df_padding = df_mdic_remaining.assign(
index_bol=-1,
index_mdic=df_mdic_remaining.index,
matched=False,
matching_stage="Padding",
vol_mdic=df_mdic_remaining["vol"],
)
# otherwise we can only pad with unknowns, since we do not know which rows
# from the unmatched MDIC are present in the unmatched BoL
else:
missing_volume = mdic_volume - bol_volume
df_padding = pd.DataFrame(
[
dict(
hs4=hs6[0:4],
hs5=hs6[0:5],
hs6=hs6,
index_bol=-1,
index_mdic=-1,
matched=False,
matching_stage="Padding",
vol=missing_volume,
vol_mdic=missing_volume,
)
]
)
# fill any missing columns with unknowns
missing_columns = set(df_final.columns) - set(df_padding.columns)
df_padding = df_padding.assign(
**{column: UNKNOWNS[column][0] for column in missing_columns}
)
# discard any extra columns
df_padding = df_padding[df_final.columns]
# add to list of padding dataframes
padding_dataframes.append(df_padding)
return pd.concat([df_final, *padding_dataframes], sort=False)
def discard_tiny_flows(df_final):
"""
Discard flows smaller than 150 kg
"""
# threshold_options = [1] + list(range(10, 251, 10))
# flows_discarded = []
# vol_discarded = []
# for threshold in threshold_options:
# print(f"When Threshold is {threshold}")
# # discard flows which are less than 10 kg
# is_tiny = df_final["vol"] < threshold
# df_tiny = df_final[is_tiny]
# flows_discarded.append(100 * sum(is_tiny) / len(is_tiny))
# vol_discarded.append(100 * df_tiny["vol"].sum() / df_final["vol"].sum())
# print(
# f"Discarding {100 * sum(is_tiny) / len(is_tiny):.1f}% of rows: these were "
# f"with less than {threshold} kg. This represents "
# f"{100 * df_tiny['vol'].sum() / df_final['vol'].sum():.1f}% of volume)"
# )
#
# import matplotlib.pyplot as plt
# import seaborn as sns
#
# sns.set()
# # plt.title("Selecting threshold: Proportion of rows discarded")
# plt.plot(threshold_options, flows_discarded)
# # plt.xlabel("Threshold")
# # plt.ylabel("% Rows Discarded")
# plt.show()
#
# sns.set()
# # plt.title("Selecting threshold: Proportion of volume discarded")
# plt.plot(threshold_options, vol_discarded)
# # plt.xlabel("Threshold")
# # plt.ylabel("% Volume Discarded")
# plt.show()
is_tiny = df_final["vol"] < 150
df_tiny = df_final[is_tiny]
print(
f"Discarding {100 * sum(is_tiny) / len(is_tiny):.1f}% of rows: these were "
f"with less than 150 kg. This represents "
f"{100 * df_tiny['vol'].sum() / df_final['vol'].sum():.1f}% of volume)"
)
return df_final[~is_tiny]
def replace_unknowns_with_nan(df, has_hs8):
if has_hs8:
df["hs8"] = df["hs8"].mask(df["hs8"].str.endswith("X"), "X" * 8)
df["hs6"] = df["hs6"].mask(df["hs6"].str.endswith("X"), "X" * 6)
df["hs5"] = df["hs5"].mask(df["hs5"].str.endswith("X"), "X" * 5)
if has_hs8:
assert all((df["hs8"] == UNKNOWNS["hs8"][0]) | df["hs8"].str.isdigit())
assert all((df["hs6"] == UNKNOWNS["hs6"][0]) | df["hs6"].str.isdigit())
assert all((df["hs5"] == UNKNOWNS["hs5"][0]) | df["hs5"].str.isdigit())
assert all(df["hs4"].str.isdigit())
return df.replace(
{column: {value: None for value in UNKNOWNS[column]} for column in UNKNOWNS}
)
def set_index(df, prefix):
df.index = pd.Index((f"{prefix}{i}" for i in range(len(df))), name="index")
def clean_bol(df_secomex, df_bol, year, has_municipality, has_hs8):
# From mid 2021, SECOMEX is not available
if year <= 2020:
df_secomex = df_secomex[["cnpj", "municipality.trase_id"]]
df_secomex = df_secomex.rename(
columns={
"municipality.trase_id": "exporter.municipality.trase_id",
"cnpj": "exporter.cnpj",
},
errors="raise",
)
df_secomex = df_secomex.drop_duplicates()
# load BoL
# Rename columns for 2023 onwards to match expected names
if year >= 2023:
df_bol = df_bol.rename(
columns={
"exporter_cnpj": "exporter.cnpj",
"exporter_label": "exporter.label",
"exporter_municipality_trase_id": "exporter.municipality.trase_id",
"exporter_type": "exporter.type",
"port_of_export_name": "port_of_export.name",
"importer_label": "importer.label",
"country_of_destination_label": "country_of_destination.label",
"country_of_destination_name": "country_of_destination.name",
"country_of_destination_trase_id": "country_of_destination.trase_id",
"net_weight_kg": "vol",
},
errors="raise",
)
df_bol = df_bol[
[
"hs4",
"hs6",
"hs5",
*(["hs8"] if has_hs8 else []),
"month",
"exporter.cnpj",
"exporter.label",
*(["exporter.municipality.trase_id"] if has_municipality else []),
"exporter.type",
"port_of_export.name",
"importer.label",
"country_of_destination.label",
"country_of_destination.name",
"country_of_destination.trase_id",
"vol",
]
]
df_bol = df_bol[df_bol["hs4"].isin(BEEF_HS4)]
df_bol = df_bol.astype({"vol": float, "month": int})
if year == 2018:
month = df_bol["month"].astype(int)
assert sorted(month.unique()) == list(range(1, 13))
df_bol = df_bol[month >= 5]
# Get the municipality from SECOMEX until 2020
if year <= 2020:
df_bol = pd.merge(
df_bol,
df_secomex,
on="exporter.cnpj",
validate="many_to_one",
how="left",
indicator=True,
suffixes=("", "_secomex"),
)
matched = df_bol.pop("_merge") == "both"
print(
f"Matched BoL with SECOMEX for {100 * sum(matched) / len(matched):.1f}% of rows"
f" / {100 * df_bol[matched]['vol'].sum() / df_bol['vol'].sum():.1f}% of volume"
)
if has_municipality:
# take municipality from SECOMEX where it was found, otherwise fall back on
# the BOL.
df_bol["exporter.municipality.trase_id"] = np.where(
matched,
df_bol.pop("exporter.municipality.trase_id_secomex"),
df_bol.pop("exporter.municipality.trase_id"),
)
# Raise an error if year >= 2021 and municipality is not available.
# At the moment (update up to 2023) there are no cases - if there are in the future,
# we could take the municipality from the CNPJ database
if year >= 2021 and not has_municipality:
raise ValueError(
"Municipality is not available for year >= 2021 in the BOL or SECOMEX. "
"Adjust the logic to take this from RFB's CNPJ reference data."
)
# filling missing municipality with unknown
df_bol = df_bol.fillna({"exporter.municipality.trase_id": "BR-XXXXXXX"})
df_bol["port_of_export.group"] = df_bol["port_of_export.name"].apply(
lambda port: PORT_GROUPS.get(port, port)
)
df_bol["country_of_destination.group"] = df_bol[
"country_of_destination.name"
].apply(lambda country: COUNTRY_GROUPS.get(country, country))
# consider BR-51XXXXX etc. to be unknown
df_bol["exporter.municipality.trase_id"] = df_bol[
"exporter.municipality.trase_id"
].mask(df_bol["exporter.municipality.trase_id"].str.contains("X"), "BR-XXXXXXX")
# fill unknowns with nan - important to exclude from matching
df_bol = replace_unknowns_with_nan(df_bol, has_hs8)
# all trade is maritime
df_bol["via"] = "01"
# we set a custom and unique index on each dataset: this means that they won't
# overlap with MDIC and so bugs will be more obvious
set_index(df_bol, f"bol-")
return df_bol
def clean_mdic(df_mdic, year, has_hs8):
if year == 2018:
month = df_mdic["month"].astype(int)
assert sorted(month.unique()) == list(range(5, 13))
df_mdic = df_mdic[month >= 5]
df_mdic["hs5"] = df_mdic["hs6"].str.slice(0, 5)
# just load one HS4 code for now so that we're working with a smaller amount of data
df_mdic = df_mdic[df_mdic["hs4"].isin(BEEF_HS4)]
df_mdic = rename(df_mdic, {"port.name": "port_of_export.name"})
df_mdic = df_mdic.astype({"vol": "float", "month": "int"})
df_mdic = df_mdic[df_mdic["vol"] > 1]
# align some port names
df_mdic["port_of_export.group"] = df_mdic["port_of_export.name"].apply(
lambda country: PORT_GROUPS.get(country, country)
)
df_mdic["country_of_destination.group"] = df_mdic[
"country_of_destination.name"
].apply(lambda country: COUNTRY_GROUPS.get(country, country))
# fill unknowns with nan - important to exclude from matching
df_mdic = replace_unknowns_with_nan(df_mdic, has_hs8)
# we set a custom and unique index on each dataset: this means that they won't
# overlap with BoL and so bugs will be more obvious
set_index(df_mdic, f"mdic-")
return df_mdic
def full_merge(*args, **kwargs):
df = pd.merge(*args, indicator=True, **kwargs)
assert all(df.pop("_merge") == "both")
return df
def add_bol_and_mdic_columns(df_solved, df_bol, df_mdic, has_hs8):
"""
At this point, `df_solved` looks like this:
```nohighlight
| index_mdic | index_bol | vol_bol | vol_mdic |
|------------|-----------|---------|----------|
| mdic-7 | bol-10512 | 11557 | 11557 |
| mdic-132 | bol-1831 | 25074 | 25074 |
| mdic-177 | bol-2281 | 2846 | 2846 |
| ... | ... | ... | ... |
```
We merge back in the original columns from MDIC and BoL using the indexes.
"""
def left_merge(df_left, df_right, left_on):
return full_merge(
df_left,
df_right,
how="left",
left_on=left_on,
right_index=True,
validate="many_to_one",
)
# bring in the state of origin from MDIC
df_solved = left_merge(df_solved, df_mdic[COLUMNS_TO_KEEP_FROM_MDIC], "index_mdic")
# bring in all other columns from the BoL
columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
if not has_hs8:
columns_to_keep_from_bol.remove("hs8")
df_solved = left_merge(
df_solved,
df_bol[columns_to_keep_from_bol],
"index_bol",
)
return df_solved
def subtract_maximum_matched_volume(df_matches, df_bol, df_mdic):
def subtract(df, index, volume):
df = pd.merge(
df,
df_matches.groupby(index)[volume].sum(),
left_index=True,
right_index=True,
how="left",
validate="one_to_one",
)
df["vol"] -= df.pop(volume).fillna(0)
return df[df["vol"] > 0]
df_bol_remaining = subtract(df_bol, "index_bol", "vol_bol")
df_mdic_remaining = subtract(df_mdic, "index_mdic", "vol_mdic")
return df_matches, df_bol_remaining, df_mdic_remaining
def run_matching(indexer, df_bol, df_mdic):
# construct a list of matching pairs
pairs = indexer.index(df_mdic, df_bol)
if pairs.get_level_values(0).size > 0:
# fairly distribute volume which can be assigned among these pairs,
allocation_mdic, allocation_bol = direct_sourcing(
pairs,
df_mdic.loc[pairs.get_level_values(0).drop_duplicates()]["vol"],
df_bol.loc[pairs.get_level_values(1).drop_duplicates()]["vol"],
relative_tolerance=0.1, # allow for 10% reduction in BoL volumes
)
# subtract the solved volume from the original data
df_matches = pd.DataFrame(
{
"index_mdic": allocation_mdic.index.get_level_values(0),
"index_bol": allocation_bol.index.get_level_values(1),
"vol_bol": allocation_bol,
"vol_mdic": allocation_mdic,
}
)
(
df_matches,
df_bol_remaining,
df_mdic_remaining,
) = subtract_maximum_matched_volume(df_matches, df_bol, df_mdic)
else:
df_matches = pd.DataFrame(
{
"index_mdic": pd.Series(dtype="str"),
"index_bol": pd.Series(dtype="int"),
"vol_bol": pd.Series(dtype="float"),
"vol_mdic": pd.Series(dtype="float"),
}
)
df_bol_remaining = df_bol
df_mdic_remaining = df_mdic
return df_matches, df_bol_remaining, df_mdic_remaining
def step1(df_bol, df_mdic, has_hs8):
import recordlinkage as rl
indexer = rl.Index()
indexer.block(
[
"country_of_destination.name",
"exporter.municipality.trase_id",
"hs4",
# note: HS5 and HS6 not included here because they can be unknown in the
# sense that they are not official UN codes; yet HS8 (a Brazilian government
# construct) may match
*(["hs8"] if has_hs8 else []),
"month",
"port_of_export.group",
"via",
]
)
return run_matching(indexer, df_bol, df_mdic)
def step2(df_bol, df_mdic):
import recordlinkage as rl
indexer = rl.Index()
indexer.sortedneighbourhood(
"month",
window=3,
block_on=[
"country_of_destination.group",
"exporter.municipality.trase_id",
"hs4",
"hs6",
"port_of_export.group",
"via",
],
)
return run_matching(indexer, df_bol, df_mdic)
def step3(df_bol, df_mdic):
import recordlinkage as rl
indexer = rl.Index()
indexer.block(
[
"country_of_destination.group",
"exporter.municipality.trase_id",
"hs4",
"port_of_export.group",
"via",
],
)
return run_matching(indexer, df_bol, df_mdic)
def assert_equality_with_bol(df_final, df_bol, has_hs8):
columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
if not has_hs8:
columns_to_keep_from_bol.remove("hs8")
# quick check that volume was conserved
assert np.isclose(df_final["vol"].sum(), df_bol["vol"].sum(), rtol=0.01)
# check no difference with all original BOL columns
difference = compare_dataframes_single(
df_final,
df_bol,
"vol",
columns_to_keep_from_bol,
Compare.signed_symmetric_relative_error,
)
assert all(np.isclose(0, difference["comparison"], atol=0.01))
def clean_string_columns(df, column_list):
"""
Clean the string columns by replacing the missing values and adjusting formats
:param df: dataframe
:param column_list: list of column names, list(str)
:return: df_2: cleaned dataframe
"""
# clean the string columns
for column in column_list:
df[column] = df[column].apply(clean_string)
missing_value_list = ["NAN", "NONE", "NA", "", "NOT DECLARED"]
# replace null values to UNKNOWN
for column in df.columns:
df.loc[df[column].isin(missing_value_list), column] = "UNKNOWN"
return df
def get_state_name(df_state, df):
df_state = df_state[df_state["region_type"] == "STATE"]
df_state = df_state[df_state["trase_id"].str.startswith("BR")]
df_state = df_state.rename(
columns={
"name": "state_of_production",
"trase_id": "state.trase_id",
},
errors="raise",
)
df_state = df_state[["state_of_production", "state.trase_id"]]
df_state = df_state.drop_duplicates()
df = full_merge(
df,
df_state,
how="left",
on="state.trase_id",
validate="many_to_one",
)
return df
# print("Running for 2019\n" + ("=" * 80))
# run(year=2019, has_hs8=True, has_municipality=False)
# print("Running for 2020\n" + ("=" * 80))
# run(year=2020, has_hs8=False, has_municipality=True)
def process(
df_bol,
df_mdic,
df_mdic_port,
df_state,
year,
has_municipality,
has_hs8,
df_secomex=None,
):
df_bol = clean_bol(df_secomex, df_bol, year, has_municipality, has_hs8)
df_mdic = clean_mdic(df_mdic, year, has_hs8)
vol_bol = df_bol["vol"].sum()
if year == 2018:
vol_mdic = pd.merge(df_mdic, df_bol["hs4"].drop_duplicates(), on="hs4")[
"vol"
].sum()
else:
vol_mdic = pd.merge(df_mdic, df_bol["hs6"].drop_duplicates(), on="hs6")[
"vol"
].sum()
def report(prefix, df):
print(
f"{prefix}: solved {100 * df['vol_bol'].sum() / vol_bol:.1f}% of BoL volume; "
f"{100 * df['vol_mdic'].sum() / vol_mdic:.1f}% of MDIC volume"
)
dfs_solved = []
# matching step 1
# ----------------------------------------------------------------------------------
df, df_bol_remaining, df_mdic_remaining = step1(df_bol, df_mdic, has_hs8)
dfs_solved.append(
df.assign(
matching_stage=f"1 - country, exporter municipality, {'hs8, ' if has_hs8 else ''}port, month"
)
)
report("Step 1", df)
# matching step 2
# ----------------------------------------------------------------------------------
df, df_bol_remaining, df_mdic_remaining = step2(df_bol_remaining, df_mdic_remaining)
dfs_solved.append(
df.assign(
matching_stage="2 - country, exporter municipality, hs6, port, within three months"
)
)
report("Step 2", df)
# matching step 3
# ----------------------------------------------------------------------------------
df, df_bol_remaining, df_mdic_remaining = step3(df_bol_remaining, df_mdic_remaining)
dfs_solved.append(
df.assign(matching_stage="3 - country, exporter municipality, hs4, port")
)
report("Step 3", df)
# bring together all matching steps
# ----------------------------------------------------------------------------------
print("Combining steps")
df_solved = concat(*dfs_solved)
df_solved = add_bol_and_mdic_columns(df_solved, df_bol, df_mdic, has_hs8)
df_final = pad_with_remaining_bol(df_solved, df_bol_remaining, has_hs8)
df_bol = replace_nan_with_unknown_value(df_bol)
assert_equality_with_bol(df_final, df_bol, has_hs8)
df_bol_remaining = replace_nan_with_unknown_value(df_bol_remaining)
df_mdic_remaining = replace_nan_with_unknown_value(df_mdic_remaining)
df_final = pad_to_mdic_volumes(df_final, df_bol_remaining, df_mdic_remaining)
df_final = discard_tiny_flows(df_final)
df_final = add_interpolated_fob_values(df_mdic_port, df_final, year)
# print increase in rows
columns_to_keep_from_bol = [*COLUMNS_TO_KEEP_FROM_BOL]
if not has_hs8:
columns_to_keep_from_bol.remove("hs8")
a = df_final.groupby([*columns_to_keep_from_bol, *COLUMNS_TO_KEEP_FROM_MDIC])
b = df_bol.groupby(columns_to_keep_from_bol)
print(f"BOL has {(len(a) / len(b)):.1f}x as many rows as before")
# clean strings and state names
# ----------------------------------------------------------------------------------
df_final = df_final.astype(str)
string_columns = [
c for c in list(df_final.columns) if c not in ["vol", "vol_mdic", "fob"]
]
df_final = clean_string_columns(df_final, string_columns)
df_final["year"] = year
df_final["exporter_geocode"] = df_final["exporter.municipality.trase_id"].str[-7:]
df_final = get_state_name(df_state, df_final)
# drop redundant column
return df_final.drop("success", axis=1)
def run(year, has_hs8, has_municipality):
S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
df_state = read_s3_parquet("postgres_views/postgres_regions.parquet")
# Secomex is only available until first months of 2021
if year <= 2020:
df_secomex = get_pandas_df_once(
f"brazil/auxiliary/secex/cleaned/EMPRESAS_CADASTRO_{year}.csv",
sep=";",
encoding="utf-8",
keep_default_na=False,
dtype=str,
)
else:
df_secomex = None
df_bol = get_pandas_df_once(
f"brazil/trade/bol/{year}/BRAZIL_BOL_{year}.csv",
sep=";",
encoding="utf-8",
keep_default_na=False,
dtype=str,
)
if year == 2018:
df_mdic = get_pandas_df_once(
f"brazil/trade/mdic/disaggregated/brazil_mdic_disaggregated_{year}_beef_02.csv",
sep=";",
encoding="utf-8",
keep_default_na=False,
dtype="str",
)
else:
df_mdic = get_pandas_df_once(
f"brazil/trade/mdic/disaggregated/brazil_mdic_disaggregated_{year}_beef.csv",
sep=";",
encoding="utf-8",
keep_default_na=False,
dtype="str",
)
df_mdic_port = get_pandas_df_once(
f"brazil/trade/mdic/port/brazil_mdic_port_{year}.csv",
sep=";",
encoding="utf-8",
keep_default_na=False,
dtype="str",
)
df_final = process(
df_bol,
df_mdic,
df_mdic_port,
df_state,
year,
has_municipality,
has_hs8,
df_secomex,
)
# write to disk
write_csv_for_upload(
df_final,
f"brazil/beef/trade/cd/disaggregated/CD_DISAGGREGATED_BEEF_{year}_02.csv",
)
def main():
print("Running for 2018\n" + ("=" * 80))
run(year=2018, has_hs8=True, has_municipality=False)
if __name__ == "__main__":
main()
from trase.data.brazil.beef.trade.cd.disaggregated.CD_DISAGGREGATED_BEEF_201X import (
process,
)
YEAR = 2022
def model(dbt, cursor):
dbt.config(materialized="external")
df_bol = dbt.ref("brazil_bol_2022").df()
df_mdic = dbt.ref("brazil_mdic_disaggregated_2022_beef").df()
df_mdic_port = dbt.ref("brazil_mdic_port_2022").df()
df_state = dbt.ref("postgres_regions_without_geometry").df()
return process(
df_bol,
df_mdic,
df_mdic_port,
df_state,
year=YEAR,
has_municipality=True,
has_hs8=True,
)