DBT: Brazil Bol 2020
File location: s3://trase-storage/brazil/trade/bol/2020/BRAZIL_BOL_2020.csv
DBT model name: brazil_bol_2020
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.brazil_bol_2020 -
Containing yaml link: trase/data_pipeline/models/brazil/trade/bol/2020/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/trade/bol/2020/brazil_bol_2020.py
-
Calls script:
trase/data/brazil/trade/bol/2020/BRAZIL_BOL_2020.py -
Tags:
mock_model,2020,bol,brazil,trade
Description
Cleaned Brazil's BOL for 2020. Note it doesn't have FOB values. Approx 150MB, and 637,945 records
Details
| Column | Type | Description |
|---|---|---|
hs6 |
VARCHAR |
|
country_of_origin.label |
VARCHAR |
|
exporter.cnpj |
VARCHAR |
|
exporter.label |
VARCHAR |
|
port_of_export.label |
VARCHAR |
|
port_of_import.label |
VARCHAR |
|
country_of_destination.label |
VARCHAR |
|
importer.city |
VARCHAR |
|
importer.label |
VARCHAR |
|
importer.country.label |
VARCHAR |
|
importer.code |
VARCHAR |
|
vol |
VARCHAR |
|
hs4 |
VARCHAR |
|
hs5 |
VARCHAR |
|
year |
VARCHAR |
|
month |
VARCHAR |
|
day |
VARCHAR |
|
exporter.state.name |
VARCHAR |
|
exporter.state.trase_id |
VARCHAR |
|
exporter.municipality.name |
VARCHAR |
|
exporter.municipality.trase_id |
VARCHAR |
|
country_of_destination.name |
VARCHAR |
|
country_of_destination.trase_id |
VARCHAR |
|
port_of_export.name |
VARCHAR |
|
exporter.type |
VARCHAR |
Models / Seeds
source.trase_duckdb.trase-storage-raw.br_exp_2020_bolmodel.trase_duckdb.hs2017
Sources
['trase-storage-raw', 'br_exp_2020_bol']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql
from trase.tools import (
find_label,
get_country_id,
get_label_trader_id,
get_node_name,
get_trader_group_id,
)
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pandasdb.find import (
find_default_name_by_node_id,
find_traders_and_groups_by_label,
find_traders_and_groups_by_trase_id,
)
from trase.tools.pcs.connect import uses_database
from trase.tools.utilities.helpers import clean_string
KNOWN_CPFS = ["00331627949", "00523682891", "00297458108", "00216760895"]
YEAR = 2020
MISSING_VALUES = ["NAN", "NONE", "NA"]
PLACES_NOT_IN_BRAZIL = [
"AARHUS",
"SANTA FE",
"BUENOS AIRES",
"AJMAN",
"ALFRETON",
"AMSTERDAM",
"ANTWERP",
"ANTWERPEN",
"ARROYITO", # argentina/uruguay/bolivia
"ASUNCION", # paraguay
"AVELLANEDA", # argentina
"BALLAJURA", # perth
"BANJUL", # gambia
"BEJAIA", # algeria
"BERGEIJK", # nertherlands
"BERGHEIM",
"BRUSSELS",
"BRUXELLES",
"BURLINGTON SQUARE",
"BURZACO", # argentina
"C SPEGAZZINI", # sounds italian..
"CHESHIRE",
"CIUDAD DEL ESTE",
"CLICHY",
"CONC DO MATO DENTRO",
"CORDOBA",
"COYOACAN",
"CUNIA",
"DIAS D AVILA",
"DUBAI",
"EMBU GUACU",
"FUNCHAL", # portugal
"GENERAL PACHECO",
"GENEVA",
"GOTEBORG",
"HAMRIYA",
"HANOI",
"HERNANDARIAS",
"HOUSTON",
"IGARAPE MIRI",
"ISELIN",
"ITAIPAVA",
"IZEGEM",
"JEDDAH",
"JERSEY",
"KEY BISCAYNE",
"LA LIBERTAD",
"LA REJA",
"LAIBIN",
"LAUSANNE",
"LITHIA",
"LJUBLJANA",
"LONDON",
"LUNEN",
"BARCELONA",
"LUQUE",
"MAR DEL PLATA",
"MELO",
"MERLO",
"MIAMI",
"MINGA GUAZU",
"MOERDIJK",
"MONTEVIDEO",
"MORGES",
"MOSCOW",
"NINGBO",
"NUEVA ESPERANZA",
"PARIS",
"PRUDENTE",
"PTO MADRYN",
"QUEBEC",
"RIBERALTA", # bolivia
"ROAD TOWN",
"RODOVRE",
"ROMFORD",
"ROTTERDAM",
"SAN IGNACIO DE VELASCO",
"SAN RAMON DE LA NUEVA ORAN",
"SAN SALVADOR",
"SANTA TECLA",
"SEREGNO",
"SHARJAH",
"SHARJAN",
"SINDELFINGEN",
"SINGAPORE",
"SISLI",
"TAIPEI",
"TORONTO",
"UNITED STATES",
"VALERIA",
"VILLETA",
"WATERLOO",
"ZHEJIANG",
"ZOETERMEER",
]
def select_and_rename_columns(df_bol, include_ship):
columns = {
"Period/YYYYMMDD": "date",
"Commodity_HS_Datamar/HS4 Code": "hs4_datamar",
"Commodity_HS_Datamar/HS4 English": "hs4_datamar_description",
"Commodity_HS_Datamar/HS6 Code": "hs6_datamar",
"Commodity_HS_Datamar/HS6 English": "hs6_datamar_description",
"Commodity_HS/HS6 Code": "hs6",
"Commodity_HS/HS6 English": "hs6_description",
"Place_and_Ports/POL_Country": "country_of_origin.label",
# exporter
"Company_Shipper/City": "exporter.municipality.label", # seems to be municipality...
"Company_Shipper/Registration Number": "exporter.cnpj",
"Company_Shipper/Shipper Name": "exporter.label",
"Company_Shipper/State Name": "exporter.state.label",
# ports, country
"Place_and_Ports/POL_Name": "port_of_export.label",
"Place_and_Ports/POD_Name": "port_of_import.label",
"Place_and_Ports/DEST_Country": "country_of_destination.label",
# importer
"Company_Consignee/City": "importer.city",
"Company_Consignee/Consignee Name": "importer.label",
"Company_Consignee/Country": "importer.country.label",
"Company_Consignee/Registration Number": "importer.code",
# volume
"WTKG": "vol",
}
if include_ship:
ship_columns = {"Vessel/Vessel Name": "vessel.label", "Vessel/IMO": "vessel.id"}
columns.update(ship_columns)
return df_bol[columns].rename(columns=columns, errors="raise")
def clean_hs_codes(df):
"""
The HS codes are a mess! Sometimes the HS6 code doesn't exist but the datamar one
does, or vice versa, or HS4 is not the same as the first characters of HS6, etc...
Here we do our best to recover as much information as possible while ensuring
that all the HS codes actually exist, or are marked with 'X' to indicate that they
don't.
"""
# add hs4 and hs5
df["hs4"] = df["hs6"].str.slice(0, 4)
df["hs5"] = df["hs6"].str.slice(0, 5)
df["hs5_datamar"] = df["hs6_datamar"].str.slice(0, 5)
# download an authoritative list of HS codes
df_hscodes = get_pandas_df_once(
"world/metadata/codes/hs/HS2017.csv", sep=";", dtype=str, keep_default_na=False
)
hs4 = df_hscodes[df_hscodes["type"] == "hs4"]["code"].rename("hs4")
hs6 = df_hscodes[df_hscodes["type"] == "hs6"]["code"].rename("hs6")
hs5 = hs6.str.slice(0, 5).drop_duplicates()
# parsing HS4 codes
df_codes = df.copy().reset_index(drop=True)
def add_codes(df, hs, left_on):
return pd.merge(
df,
hs,
left_on=left_on,
right_on=hs.name,
how="left",
validate="many_to_one",
indicator=True,
)
# fmt: off
df_codes["hs4_exists"] = add_codes(df_codes, hs4, "hs4").pop("_merge") == "both"
df_codes["hs5_exists"] = add_codes(df_codes, hs5, "hs5").pop("_merge") == "both"
df_codes["hs6_exists"] = add_codes(df_codes, hs6, "hs6").pop("_merge") == "both"
df_codes["hs4_datamar_exists"] = add_codes(df_codes, hs4, "hs4_datamar").pop("_merge") == "both"
df_codes["hs5_datamar_exists"] = add_codes(df_codes, hs5, "hs5_datamar").pop("_merge") == "both"
df_codes["hs6_datamar_exists"] = add_codes(df_codes, hs6, "hs6_datamar").pop("_merge") == "both"
# fmt: on
def choose_hs_codes(row):
if (
not row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and not row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return "XXXX", "XXXXX", "XXXXXX"
if (
not row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
hs4 = row["hs4_datamar"]
return hs4, hs4 + "X", hs4 + "XX"
if (
not row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4_datamar"] == row["hs6_datamar"][0:4]
and row["hs4_datamar_exists"]
and row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4_datamar"], row["hs5_datamar"], row["hs5_datamar"] + "X"
if (
not row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4_datamar"] == row["hs6_datamar"][0:4]
and row["hs4_datamar_exists"]
and row["hs5_datamar_exists"]
and row["hs6_datamar_exists"]
):
return row["hs4_datamar"], row["hs5_datamar"], row["hs6_datamar"]
if (
row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and not row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs4"] + "X", row["hs4"] + "XX"
if (
row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4"] != row["hs4_datamar"]
and row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs4"] + "X", row["hs4"] + "XX"
if (
row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4"] == row["hs4_datamar"]
and row["hs4_datamar"] == row["hs6_datamar"][0:4]
and row["hs4_datamar_exists"]
and row["hs5_datamar_exists"]
and row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5_datamar"], row["hs6_datamar"]
if (
row["hs4_exists"]
and not row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4"] == row["hs4_datamar"]
and row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs4"] + "X", row["hs4"] + "XX"
if (
row["hs4_exists"]
and row["hs5_exists"]
and not row["hs6_exists"]
and row["hs4"] == row["hs4_datamar"]
and row["hs5"] == row["hs5_datamar"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5"], row["hs6"] + "X"
if (
row["hs4_exists"]
and row["hs5_exists"]
and row["hs6_exists"]
and not row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5"], row["hs6"]
if (
row["hs4_exists"]
and row["hs5_exists"]
and row["hs6_exists"]
and row["hs4"] != row["hs4_datamar"]
and row["hs4_datamar_exists"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5"], row["hs6"]
if (
row["hs4_exists"]
and row["hs5_exists"]
and row["hs6_exists"]
and row["hs4"] != row["hs4_datamar"]
and row["hs5"] != row["hs5_datamar"]
and row["hs6"] != row["hs6_datamar"]
and row["hs4_datamar_exists"]
and row["hs5_datamar_exists"]
and row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5"], row["hs6"]
if (
row["hs4_exists"]
and row["hs5_exists"]
and row["hs6_exists"]
and row["hs4"] == row["hs4_datamar"]
and not row["hs5_datamar_exists"]
and not row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5"], row["hs6"]
if (
row["hs4_exists"]
and row["hs5_exists"]
and row["hs6_exists"]
and row["hs5"] != row["hs5_datamar"]
and row["hs6"] != row["hs6_datamar"]
and row["hs4_datamar_exists"]
and row["hs5_datamar_exists"]
and row["hs6_datamar_exists"]
):
return row["hs4"], row["hs5"], row["hs6"]
if (
row["hs4_exists"]
and row["hs5_exists"]
and row["hs6_exists"]
and row["hs4"] == row["hs4_datamar"]
and row["hs5"] == row["hs5_datamar"]
and row["hs6"] == row["hs6_datamar"]
):
return row["hs4"], row["hs5"], row["hs6"]
# unhandled case
raise ValueError
# run the above function on the relevant columns
df_codes = df_codes[
[
"hs4",
"hs5",
"hs6",
"hs4_exists",
"hs5_exists",
"hs6_exists",
"hs4_datamar",
"hs5_datamar",
"hs6_datamar",
"hs4_datamar_exists",
"hs5_datamar_exists",
"hs6_datamar_exists",
]
].drop_duplicates()
df_codes["new_hs4"], df_codes["new_hs5"], df_codes["new_hs6"] = zip(
*df_codes.apply(choose_hs_codes, axis=1)
)
df_codes = df_codes[
[
"new_hs4",
"new_hs5",
"new_hs6",
"hs4",
"hs5",
"hs6",
"hs4_datamar",
"hs5_datamar",
"hs6_datamar",
]
].drop_duplicates()
# merge the results in
df = pd.merge(
df,
df_codes,
on=["hs4", "hs5", "hs6", "hs4_datamar", "hs5_datamar", "hs6_datamar"],
how="left",
validate="many_to_one",
indicator=True,
)
assert all(df.pop("_merge") == "both")
df["hs4"] = df.pop("new_hs4")
df["hs5"] = df.pop("new_hs5")
df["hs6"] = df.pop("new_hs6")
df = df.drop(
columns=[
"hs6_description",
"hs4_datamar",
"hs5_datamar",
"hs6_datamar",
"hs4_datamar_description",
"hs6_datamar_description",
],
errors="raise",
)
return df
def clean_string_columns(df, column_list):
# clean the string columns
for column in column_list:
df[column] = df[column].apply(clean_string)
# replace null values to UNKNOWN
for column in df.columns:
df.loc[df[column].isin(MISSING_VALUES), column] = "UNKNOWN"
@uses_database
def get_country_labels(cnx=None):
"""Retrieve country name, label, and trase_id"""
return pd.read_sql(
"""
select distinct
name as "country_of_destination.name",
unnest(synonyms) as "country_of_destination.label",
coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
from views.regions where level = 1 and length(trase_id) = 2
""",
cnx.cnx,
)
def assert_none_missing(df, column):
missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
assert missing.empty, f"Not all {column} found:\n{missing}"
def clean_countries(df):
"""Introduce country name and trase id to the dataframe"""
df = pd.merge(
df,
get_country_labels(),
on="country_of_destination.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "country_of_destination.label")
return df
@uses_database
def get_port_labels(cnx=None):
return pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions where region_type = 'PORT' and country = 'BRAZIL'
""",
cnx.cnx,
)
def clean_ports(df):
"""Introduce port name and trase id to the dataframe"""
# the port BIJUPIRA SALEMA FIELD is an oil field 250km off the coast
# just set this to unknown port...
df = df.copy()
m = df["port_of_export.label"] == "BIJUPIRA SALEMA FIELD"
df.loc[m, "port_of_export.label"] = "UNKNOWN PORT"
df = pd.merge(
df,
get_port_labels(),
on="port_of_export.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "port_of_export.label")
return df
def clean_cnpjs(df):
df = df.copy()
cnpj = df["exporter.cnpj"].str.rjust(14, "0")
cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)
cpf = df["exporter.cnpj"].str.rjust(11, "0")
cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)
cnpj_valid[cpf.isin(KNOWN_CPFS)] = False
assert not any(cnpj_valid & cpf_valid)
df["exporter.type"] = "unknown"
df.loc[cnpj_valid, "exporter.type"] = "cnpj"
df.loc[cpf_valid, "exporter.type"] = "cpf"
df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])
return df
def exclude_corrupt_rows(df):
is_duplicated_header_row = df["Period/YYYYMMDD"] == "Period/YYYYMMDD"
is_completely_na = df["Period/YYYYMMDD"] == "NA"
return df[~(is_duplicated_header_row | is_completely_na)].copy()
def parse_dates(df):
df = df.assign(year=df["date"].str.slice(0, 4).astype(int))
df = df.assign(month=df["date"].str.slice(4, 6).astype(int))
df = df.assign(day=df["date"].str.slice(6, 8).astype(int))
assert all(df["year"] == 2020)
assert all(0 < df["month"]) and all(df["month"] <= 12)
assert all(0 < df["day"]) and all(df["day"] <= 31)
return df.drop(columns="date")
@uses_database
def clean_states(df, cnx=None):
df_states = pd.read_sql(
"""
select distinct
name as "exporter.state.name",
unnest(synonyms) as "exporter.state.label", -- TODO .label
trase_id as "exporter.state.trase_id"
from views.regions where country = 'BRAZIL' and region_type = 'STATE'
""",
cnx.cnx,
)
# Some "states" are outside Brazil
df_states = df_states.append(
{
"exporter.state.name": "NONE",
"exporter.state.label": "NONE",
"exporter.state.trase_id": "",
},
ignore_index=True,
)
df = df.replace(
{
"exporter.state.label": {
c: "NONE"
for c in [
"BUENOS AIRES",
"CORDOBA",
"FLORIDA",
"NEW JERSEY",
"TEXAS",
"VIRGINIA",
"SANTA FE",
]
}
}
)
# merge in trase ids
df = pd.merge(
df,
df_states,
on="exporter.state.label",
how="left",
indicator=True,
validate="many_to_one",
)
# assert everything matched
assert all(df.pop("_merge") == "both")
return df.drop(columns=["exporter.state.label"], errors="raise")
@uses_database
def clean_municipalities(df, cnx=None):
df_backup = df.copy()
df_municipalities = pd.read_sql(
f"""
select distinct
name as "exporter.municipality.name",
unnest(synonyms) as "exporter.municipality.label",
trase_id as "exporter.municipality.trase_id",
substr(trase_id, 0, 6) as "exporter.state.trase_id"
from views.regions
where country = 'BRAZIL' and region_type = 'MUNICIPALITY'
""",
cnx.cnx,
)
states = df_municipalities["exporter.state.trase_id"].unique()
# replace some specific values
df = df.replace(
{
"exporter.municipality.label": {
# city -> municipality
"TATUQUARA": "CURITIBA",
# some unknowns
"ZONA RURAL": "UNKNOWN",
"ESPIRITO SANTOS": "UNKNOWN",
"RIO GRANDE DO SUL": "UNKNOWN",
"MATO GROSSO": "UNKNOWN",
# places not in brazil
**{p: "NONE" for p in PLACES_NOT_IN_BRAZIL},
# labels that are missing from the database
# TODO add these to the database
"ALTA FLORESTA D OESTE": "ALTA FLORESTA D'OESTE",
"BELA VISTA DO PARAIS": "BELA VISTA DO PARAISO",
"CAPAO GRANDE": "VARZEA GRANDE",
"ENCOIS PAULISTA": "LENCOIS PAULISTA",
"NOVA BRASILANDIA D OESTE": "NOVA BRASILANDIA D'OESTE",
"FAZ SAO JOAQUIM": "SAO JOAQUIM",
"GOVERNADOR DIX SEPT ROSADO": "GOVERNADOR DIX-SEPT ROSADO",
"SAPUCAI MIRIM": "SAPUCAI-MIRIM",
"HERVAL D OESTE": "HERVAL D'OESTE",
"MACHADINHO D OESTE": "MACHADINHO D'OESTE",
"ESPIGAO D OESTE": "ESPIGAO D'OESTE",
},
}
)
# add some missing municipalities
# TODO add these to the database
df_municipalities = df_municipalities.append(
[
{
"exporter.municipality.name": "POMBAL",
"exporter.municipality.label": "POMBAL",
"exporter.municipality.trase_id": "BR-2512101",
"exporter.state.trase_id": "BR-XX",
},
{
"exporter.municipality.name": "SIDROLANDIA",
"exporter.municipality.label": "IDROLANDIA",
"exporter.municipality.trase_id": "BR-5007901",
"exporter.state.trase_id": "BR-50",
},
{
"exporter.municipality.name": "ALTA FLORESTA",
"exporter.municipality.label": "FLORESTA",
"exporter.municipality.trase_id": "BR-5100250",
"exporter.state.trase_id": "BR-51",
},
{
"exporter.municipality.name": "NONE",
"exporter.municipality.label": "NA",
"exporter.municipality.trase_id": "",
"exporter.state.trase_id": "",
},
{
"exporter.municipality.name": "NONE",
"exporter.municipality.label": "UNKNOWN",
"exporter.municipality.trase_id": "",
"exporter.state.trase_id": "",
},
],
ignore_index=True,
)
# add municipalities for NONE
df_municipalities = df_municipalities.append(
[
{
"exporter.municipality.name": "NONE",
"exporter.municipality.label": "NONE",
"exporter.municipality.trase_id": "",
"exporter.state.trase_id": state,
}
for state in [*states, ""]
],
ignore_index=True,
)
# add municipalities for NA
df_municipalities = df_municipalities.append(
[
{
"exporter.municipality.name": "UNKNOWN",
"exporter.municipality.label": "NA",
"exporter.municipality.trase_id": state + "XXXXX",
"exporter.state.trase_id": state,
}
for state in states
if state != "BR-XX"
],
ignore_index=True,
)
df_municipalities = df_municipalities.append(
[
{
"exporter.municipality.name": "UNKNOWN",
"exporter.municipality.label": label,
"exporter.municipality.trase_id": state + "XXXXX",
"exporter.state.trase_id": state,
}
for label, state in [
("SAO JOAQUIM", "BR-35"),
("GUARIBA", "BR-51"),
("JAGUARE", "BR-35"),
("CENTRAL", "BR-XX"),
("ACARAPE", "BR-42"),
("GRACA", "BR-35"),
("CACHOEIRINHA", "BR-35"),
("PAULISTA", "BR-35"),
("ESTRELA D OESTE", "BR-33"),
("SAO MARCOS", "BR-35"),
("HORIZONTE", "BR-31"),
]
]
)
df = pd.merge(
df.assign(i=range(len(df))),
df_municipalities,
on=["exporter.municipality.label", "exporter.state.trase_id"],
how="left",
indicator=True,
validate="many_to_one",
)
assert not any(df.pop("i").duplicated())
assert all(df.pop("_merge") == "both")
return df.drop(columns=["exporter.municipality.label"], errors="raise")
@uses_database
def clean_exporters_and_add_group(df, cur=None, cnx=None):
"""
This function adds two columns:
exporter.name - the default name of the exporter from the database
exporter.group - the group name from the database
It does this using the following algorithm:
1. Construct a Trase ID from exporter.cnpj and use this to perform a lookup in the
database
2. If a unique name + group cannot be found through that method, use exporter.label
to perform a lookup among trader labels in the database
TODO: try to do this more concisely / in fewer lines of code
"""
trase_ids = "BR-TRADER-" + df["exporter.cnpj"].str.slice(0, 8)
trase_ids = trase_ids.replace({"BR-TRADER-00000000": None})
df = df.assign(**{"exporter.trase_id": trase_ids})
df_exporters = df[["exporter.label", "exporter.trase_id"]].drop_duplicates()
# clean exporter names using trase id
df_exporters[["exporter.trader_id", "exporter.group", "count"]] = (
find_traders_and_groups_by_trase_id(
df_exporters.rename(columns={"exporter.trase_id": "trase_id"})[
["trase_id"]
],
returning=["trader_id", "group_name", "count"],
year=sql.Literal(YEAR),
cur=cur,
cnx=cnx,
)
)
counts = df_exporters.pop("count")
assert all(counts.isin([0, 1]))
not_found_by_trase_id = counts == 0
print(
f"{sum(~not_found_by_trase_id)} exporters were found by Trase ID and "
f"{sum(not_found_by_trase_id)} were not"
)
df_found_by_trase_id = df_exporters[~not_found_by_trase_id]
df_missing = df_exporters[not_found_by_trase_id].copy()
# if not found by Trase ID, then look by name
labels = df_missing["exporter.label"].drop_duplicates()
df_labels = pd.DataFrame(labels)
df_labels[["exporter.trader_id", "exporter.group", "count"]] = (
find_traders_and_groups_by_label(
df_labels.rename(columns={"exporter.label": "trader_label"}),
returning=["trader_id", "group_name", "count"],
year=sql.Literal(YEAR),
)
)
# special case for UNKNOWN CUSTOMER
is_unknown = (df_labels["count"] != 1) & (
df_labels["exporter.label"] == "UNKNOWN CUSTOMER"
)
if any(is_unknown):
brazil_id = get_country_id("BRAZIL", cur=cur)
label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
trader_id = get_label_trader_id(label_id, brazil_id)
group_id = get_trader_group_id(trader_id, cur=cur)
group_name = get_node_name(group_id, cur=cur)
df_labels.loc[is_unknown, "exporter.trader_id"] = trader_id
df_labels.loc[is_unknown, "exporter.group"] = group_name
df_labels.loc[is_unknown, "count"] = 1
# we should have found one unique node for every exporter
bad = df_labels.pop("count") != 1
df_labels[bad].to_csv("/tmp/bad.csv")
if any(bad):
raise ValueError(f"Missing some exporters:\n{df_labels[bad]}")
# merge exporters found by trase id back into results
right = df_found_by_trase_id[
["exporter.trase_id", "exporter.trader_id", "exporter.group"]
].drop_duplicates()
df1 = pd.merge(
df,
right,
on=["exporter.trase_id"],
how="left",
validate="many_to_one",
indicator=True,
)
merge = df1.pop("_merge")
df_solved1 = df1[merge == "both"]
# merge exporters found by label back into results
df_unsolved = df1[merge != "both"]
df_unsolved = df_unsolved.drop(
columns=["exporter.trader_id", "exporter.group"], errors="raise"
)
right = df_labels[
["exporter.label", "exporter.trader_id", "exporter.group"]
].drop_duplicates()
df_solved2 = pd.merge(
df_unsolved,
right,
on=["exporter.label"],
how="left",
validate="many_to_one",
indicator=True,
)
merge = df_solved2.pop("_merge")
assert all(merge == "both")
# combine the two
expected_columns = list(set(df.columns) | {"exporter.trader_id", "exporter.group"})
assert sorted(df_solved2.columns) == sorted(expected_columns)
assert sorted(df_solved1.columns) == sorted(expected_columns)
df_final = pd.concat([df_solved1, df_solved2]).reset_index(drop=True)
# guarantee that we didn't change the original data
a = df.sort_values(list(df.columns)).reset_index(drop=True)
b = df_final[df.columns].sort_values(list(df.columns)).reset_index(drop=True)
b.columns.name = a.columns.name # needed for assert equal but don't know what it is
pd.testing.assert_frame_equal(a, b)
# add exporter names
df_final = df_final.astype({"exporter.trader_id": int})
df_final[["exporter.name"]] = find_default_name_by_node_id(
df_final[["exporter.trader_id"]].rename(
columns={"exporter.trader_id": "node_id"}
),
returning=["name"],
cnx=cnx,
cur=cur,
)
return df_final
def main(include_ship):
if include_ship:
key = "brazil/trade/bol/2020/originals/BR_EXP_2020_soy_ship_names.csv"
encoding = "windows-1252"
else:
key = "brazil/trade/bol/2020/originals/BR_EXP_2020_BOL.csv"
encoding = "utf8"
df = get_pandas_df_once(
key,
encoding=encoding,
sep=";",
dtype=str,
keep_default_na=False,
)
df = exclude_corrupt_rows(df)
df = select_and_rename_columns(df, include_ship)
df = clean_hs_codes(df)
df = df.astype({"vol": float})
df = parse_dates(df)
clean_string_columns(
df,
[
"country_of_origin.label",
"exporter.municipality.label",
"exporter.label",
"exporter.state.label",
"port_of_export.label",
"port_of_import.label",
"country_of_destination.label",
"importer.city",
"importer.label",
"importer.country.label",
*(["vessel.label"] if include_ship else []),
],
)
df = clean_states(df)
df = clean_municipalities(df)
df = clean_countries(df)
df = clean_ports(df)
df = clean_cnpjs(df)
df = clean_exporters_and_add_group(df)
if include_ship:
write_csv_for_upload(
df, "brazil/trade/bol/2020/BRAZIL_BOL_2020_SOY_SHIP_NAMES.csv"
)
else:
write_csv_for_upload(df, "brazil/trade/bol/2020/BRAZIL_BOL_2020.csv")
def assert_country_of_origin(df):
not_brazil = df[df["country_of_origin.label"] != "BRAZIL"].drop_duplicates()
assert not_brazil.empty, f"Not all origin country is Brazil, found: \n{not_brazil}"
if __name__ == "__main__":
main(include_ship=True)
main(include_ship=False)
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "br_exp_2020_bol")
dbt.ref("hs2017")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})