Skip to content

DBT: Brazil Bol 2020

File location: s3://trase-storage/brazil/trade/bol/2020/BRAZIL_BOL_2020.csv

DBT model name: brazil_bol_2020

Explore on Metabase: Full table; summary statistics

DBT details


Description

Cleaned Brazil's BOL for 2020. Note it doesn't have FOB values. Approx 150MB, and 637,945 records


Details

Column Type Description
hs6 VARCHAR
country_of_origin.label VARCHAR
exporter.cnpj VARCHAR
exporter.label VARCHAR
port_of_export.label VARCHAR
port_of_import.label VARCHAR
country_of_destination.label VARCHAR
importer.city VARCHAR
importer.label VARCHAR
importer.country.label VARCHAR
importer.code VARCHAR
vol VARCHAR
hs4 VARCHAR
hs5 VARCHAR
year VARCHAR
month VARCHAR
day VARCHAR
exporter.state.name VARCHAR
exporter.state.trase_id VARCHAR
exporter.municipality.name VARCHAR
exporter.municipality.trase_id VARCHAR
country_of_destination.name VARCHAR
country_of_destination.trase_id VARCHAR
port_of_export.name VARCHAR
exporter.type VARCHAR

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.br_exp_2020_bol
  • model.trase_duckdb.hs2017

Sources

  • ['trase-storage-raw', 'br_exp_2020_bol']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql

from trase.tools import (
    find_label,
    get_country_id,
    get_label_trader_id,
    get_node_name,
    get_trader_group_id,
)
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pandasdb.find import (
    find_default_name_by_node_id,
    find_traders_and_groups_by_label,
    find_traders_and_groups_by_trase_id,
)
from trase.tools.pcs.connect import uses_database
from trase.tools.utilities.helpers import clean_string

KNOWN_CPFS = ["00331627949", "00523682891", "00297458108", "00216760895"]
YEAR = 2020
MISSING_VALUES = ["NAN", "NONE", "NA"]
PLACES_NOT_IN_BRAZIL = [
    "AARHUS",
    "SANTA FE",
    "BUENOS AIRES",
    "AJMAN",
    "ALFRETON",
    "AMSTERDAM",
    "ANTWERP",
    "ANTWERPEN",
    "ARROYITO",  # argentina/uruguay/bolivia
    "ASUNCION",  # paraguay
    "AVELLANEDA",  # argentina
    "BALLAJURA",  # perth
    "BANJUL",  # gambia
    "BEJAIA",  # algeria
    "BERGEIJK",  # nertherlands
    "BERGHEIM",
    "BRUSSELS",
    "BRUXELLES",
    "BURLINGTON SQUARE",
    "BURZACO",  # argentina
    "C SPEGAZZINI",  # sounds italian..
    "CHESHIRE",
    "CIUDAD DEL ESTE",
    "CLICHY",
    "CONC DO MATO DENTRO",
    "CORDOBA",
    "COYOACAN",
    "CUNIA",
    "DIAS D AVILA",
    "DUBAI",
    "EMBU GUACU",
    "FUNCHAL",  # portugal
    "GENERAL PACHECO",
    "GENEVA",
    "GOTEBORG",
    "HAMRIYA",
    "HANOI",
    "HERNANDARIAS",
    "HOUSTON",
    "IGARAPE MIRI",
    "ISELIN",
    "ITAIPAVA",
    "IZEGEM",
    "JEDDAH",
    "JERSEY",
    "KEY BISCAYNE",
    "LA LIBERTAD",
    "LA REJA",
    "LAIBIN",
    "LAUSANNE",
    "LITHIA",
    "LJUBLJANA",
    "LONDON",
    "LUNEN",
    "BARCELONA",
    "LUQUE",
    "MAR DEL PLATA",
    "MELO",
    "MERLO",
    "MIAMI",
    "MINGA GUAZU",
    "MOERDIJK",
    "MONTEVIDEO",
    "MORGES",
    "MOSCOW",
    "NINGBO",
    "NUEVA ESPERANZA",
    "PARIS",
    "PRUDENTE",
    "PTO MADRYN",
    "QUEBEC",
    "RIBERALTA",  # bolivia
    "ROAD TOWN",
    "RODOVRE",
    "ROMFORD",
    "ROTTERDAM",
    "SAN IGNACIO DE VELASCO",
    "SAN RAMON DE LA NUEVA ORAN",
    "SAN SALVADOR",
    "SANTA TECLA",
    "SEREGNO",
    "SHARJAH",
    "SHARJAN",
    "SINDELFINGEN",
    "SINGAPORE",
    "SISLI",
    "TAIPEI",
    "TORONTO",
    "UNITED STATES",
    "VALERIA",
    "VILLETA",
    "WATERLOO",
    "ZHEJIANG",
    "ZOETERMEER",
]


def select_and_rename_columns(df_bol, include_ship):
    columns = {
        "Period/YYYYMMDD": "date",
        "Commodity_HS_Datamar/HS4 Code": "hs4_datamar",
        "Commodity_HS_Datamar/HS4 English": "hs4_datamar_description",
        "Commodity_HS_Datamar/HS6 Code": "hs6_datamar",
        "Commodity_HS_Datamar/HS6 English": "hs6_datamar_description",
        "Commodity_HS/HS6 Code": "hs6",
        "Commodity_HS/HS6 English": "hs6_description",
        "Place_and_Ports/POL_Country": "country_of_origin.label",
        # exporter
        "Company_Shipper/City": "exporter.municipality.label",  # seems to be municipality...
        "Company_Shipper/Registration Number": "exporter.cnpj",
        "Company_Shipper/Shipper Name": "exporter.label",
        "Company_Shipper/State Name": "exporter.state.label",
        # ports, country
        "Place_and_Ports/POL_Name": "port_of_export.label",
        "Place_and_Ports/POD_Name": "port_of_import.label",
        "Place_and_Ports/DEST_Country": "country_of_destination.label",
        # importer
        "Company_Consignee/City": "importer.city",
        "Company_Consignee/Consignee Name": "importer.label",
        "Company_Consignee/Country": "importer.country.label",
        "Company_Consignee/Registration Number": "importer.code",
        # volume
        "WTKG": "vol",
    }

    if include_ship:
        ship_columns = {"Vessel/Vessel Name": "vessel.label", "Vessel/IMO": "vessel.id"}
        columns.update(ship_columns)
    return df_bol[columns].rename(columns=columns, errors="raise")


def clean_hs_codes(df):
    """
    The HS codes are a mess! Sometimes the HS6 code doesn't exist but the datamar one
    does, or vice versa, or HS4 is not the same as the first characters of HS6, etc...
    Here we do our best to recover as much information as possible while ensuring
    that all the HS codes actually exist, or are marked with 'X' to indicate that they
    don't.
    """
    # add hs4 and hs5
    df["hs4"] = df["hs6"].str.slice(0, 4)
    df["hs5"] = df["hs6"].str.slice(0, 5)
    df["hs5_datamar"] = df["hs6_datamar"].str.slice(0, 5)

    # download an authoritative list of HS codes
    df_hscodes = get_pandas_df_once(
        "world/metadata/codes/hs/HS2017.csv", sep=";", dtype=str, keep_default_na=False
    )
    hs4 = df_hscodes[df_hscodes["type"] == "hs4"]["code"].rename("hs4")
    hs6 = df_hscodes[df_hscodes["type"] == "hs6"]["code"].rename("hs6")
    hs5 = hs6.str.slice(0, 5).drop_duplicates()

    # parsing HS4 codes
    df_codes = df.copy().reset_index(drop=True)

    def add_codes(df, hs, left_on):
        return pd.merge(
            df,
            hs,
            left_on=left_on,
            right_on=hs.name,
            how="left",
            validate="many_to_one",
            indicator=True,
        )

    # fmt: off
    df_codes["hs4_exists"] = add_codes(df_codes, hs4, "hs4").pop("_merge") == "both"
    df_codes["hs5_exists"] = add_codes(df_codes, hs5, "hs5").pop("_merge") == "both"
    df_codes["hs6_exists"] = add_codes(df_codes, hs6, "hs6").pop("_merge") == "both"
    df_codes["hs4_datamar_exists"] = add_codes(df_codes, hs4, "hs4_datamar").pop("_merge") == "both"
    df_codes["hs5_datamar_exists"] = add_codes(df_codes, hs5, "hs5_datamar").pop("_merge") == "both"
    df_codes["hs6_datamar_exists"] = add_codes(df_codes, hs6, "hs6_datamar").pop("_merge") == "both"
    # fmt: on

    def choose_hs_codes(row):
        if (
            not row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and not row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return "XXXX", "XXXXX", "XXXXXX"
        if (
            not row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            hs4 = row["hs4_datamar"]
            return hs4, hs4 + "X", hs4 + "XX"
        if (
            not row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4_datamar"] == row["hs6_datamar"][0:4]
            and row["hs4_datamar_exists"]
            and row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4_datamar"], row["hs5_datamar"], row["hs5_datamar"] + "X"
        if (
            not row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4_datamar"] == row["hs6_datamar"][0:4]
            and row["hs4_datamar_exists"]
            and row["hs5_datamar_exists"]
            and row["hs6_datamar_exists"]
        ):
            return row["hs4_datamar"], row["hs5_datamar"], row["hs6_datamar"]
        if (
            row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and not row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs4"] + "X", row["hs4"] + "XX"
        if (
            row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4"] != row["hs4_datamar"]
            and row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs4"] + "X", row["hs4"] + "XX"
        if (
            row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4"] == row["hs4_datamar"]
            and row["hs4_datamar"] == row["hs6_datamar"][0:4]
            and row["hs4_datamar_exists"]
            and row["hs5_datamar_exists"]
            and row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5_datamar"], row["hs6_datamar"]
        if (
            row["hs4_exists"]
            and not row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4"] == row["hs4_datamar"]
            and row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs4"] + "X", row["hs4"] + "XX"
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and not row["hs6_exists"]
            and row["hs4"] == row["hs4_datamar"]
            and row["hs5"] == row["hs5_datamar"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5"], row["hs6"] + "X"
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and row["hs6_exists"]
            and not row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5"], row["hs6"]
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and row["hs6_exists"]
            and row["hs4"] != row["hs4_datamar"]
            and row["hs4_datamar_exists"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5"], row["hs6"]
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and row["hs6_exists"]
            and row["hs4"] != row["hs4_datamar"]
            and row["hs5"] != row["hs5_datamar"]
            and row["hs6"] != row["hs6_datamar"]
            and row["hs4_datamar_exists"]
            and row["hs5_datamar_exists"]
            and row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5"], row["hs6"]
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and row["hs6_exists"]
            and row["hs4"] == row["hs4_datamar"]
            and not row["hs5_datamar_exists"]
            and not row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5"], row["hs6"]
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and row["hs6_exists"]
            and row["hs5"] != row["hs5_datamar"]
            and row["hs6"] != row["hs6_datamar"]
            and row["hs4_datamar_exists"]
            and row["hs5_datamar_exists"]
            and row["hs6_datamar_exists"]
        ):
            return row["hs4"], row["hs5"], row["hs6"]
        if (
            row["hs4_exists"]
            and row["hs5_exists"]
            and row["hs6_exists"]
            and row["hs4"] == row["hs4_datamar"]
            and row["hs5"] == row["hs5_datamar"]
            and row["hs6"] == row["hs6_datamar"]
        ):
            return row["hs4"], row["hs5"], row["hs6"]

        # unhandled case
        raise ValueError

    # run the above function on the relevant columns
    df_codes = df_codes[
        [
            "hs4",
            "hs5",
            "hs6",
            "hs4_exists",
            "hs5_exists",
            "hs6_exists",
            "hs4_datamar",
            "hs5_datamar",
            "hs6_datamar",
            "hs4_datamar_exists",
            "hs5_datamar_exists",
            "hs6_datamar_exists",
        ]
    ].drop_duplicates()
    df_codes["new_hs4"], df_codes["new_hs5"], df_codes["new_hs6"] = zip(
        *df_codes.apply(choose_hs_codes, axis=1)
    )
    df_codes = df_codes[
        [
            "new_hs4",
            "new_hs5",
            "new_hs6",
            "hs4",
            "hs5",
            "hs6",
            "hs4_datamar",
            "hs5_datamar",
            "hs6_datamar",
        ]
    ].drop_duplicates()

    # merge the results in
    df = pd.merge(
        df,
        df_codes,
        on=["hs4", "hs5", "hs6", "hs4_datamar", "hs5_datamar", "hs6_datamar"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    assert all(df.pop("_merge") == "both")
    df["hs4"] = df.pop("new_hs4")
    df["hs5"] = df.pop("new_hs5")
    df["hs6"] = df.pop("new_hs6")
    df = df.drop(
        columns=[
            "hs6_description",
            "hs4_datamar",
            "hs5_datamar",
            "hs6_datamar",
            "hs4_datamar_description",
            "hs6_datamar_description",
        ],
        errors="raise",
    )

    return df


def clean_string_columns(df, column_list):
    # clean the string columns
    for column in column_list:
        df[column] = df[column].apply(clean_string)

    # replace null values to UNKNOWN
    for column in df.columns:
        df.loc[df[column].isin(MISSING_VALUES), column] = "UNKNOWN"


@uses_database
def get_country_labels(cnx=None):
    """Retrieve country name, label, and trase_id"""
    return pd.read_sql(
        """
        select distinct 
            name as "country_of_destination.name",
            unnest(synonyms) as "country_of_destination.label",
            coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
        from views.regions where level = 1 and length(trase_id) = 2
        """,
        cnx.cnx,
    )


def assert_none_missing(df, column):
    missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
    assert missing.empty, f"Not all {column} found:\n{missing}"


def clean_countries(df):
    """Introduce country name and trase id to the dataframe"""
    df = pd.merge(
        df,
        get_country_labels(),
        on="country_of_destination.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "country_of_destination.label")
    return df


@uses_database
def get_port_labels(cnx=None):
    return pd.read_sql(
        """
        select distinct 
            name as "port_of_export.name", 
            unnest(synonyms) as "port_of_export.label"
        from views.regions where region_type = 'PORT' and country = 'BRAZIL'
        """,
        cnx.cnx,
    )


def clean_ports(df):
    """Introduce port name and trase id to the dataframe"""

    # the port BIJUPIRA SALEMA FIELD is an oil field 250km off the coast
    # just set this to unknown port...
    df = df.copy()
    m = df["port_of_export.label"] == "BIJUPIRA SALEMA FIELD"
    df.loc[m, "port_of_export.label"] = "UNKNOWN PORT"

    df = pd.merge(
        df,
        get_port_labels(),
        on="port_of_export.label",
        validate="many_to_one",
        how="left",
        indicator=True,
    )
    assert_none_missing(df, "port_of_export.label")
    return df


def clean_cnpjs(df):
    df = df.copy()

    cnpj = df["exporter.cnpj"].str.rjust(14, "0")
    cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)

    cpf = df["exporter.cnpj"].str.rjust(11, "0")
    cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)
    cnpj_valid[cpf.isin(KNOWN_CPFS)] = False

    assert not any(cnpj_valid & cpf_valid)

    df["exporter.type"] = "unknown"
    df.loc[cnpj_valid, "exporter.type"] = "cnpj"
    df.loc[cpf_valid, "exporter.type"] = "cpf"

    df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
    df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])

    return df


def exclude_corrupt_rows(df):
    is_duplicated_header_row = df["Period/YYYYMMDD"] == "Period/YYYYMMDD"
    is_completely_na = df["Period/YYYYMMDD"] == "NA"
    return df[~(is_duplicated_header_row | is_completely_na)].copy()


def parse_dates(df):
    df = df.assign(year=df["date"].str.slice(0, 4).astype(int))
    df = df.assign(month=df["date"].str.slice(4, 6).astype(int))
    df = df.assign(day=df["date"].str.slice(6, 8).astype(int))
    assert all(df["year"] == 2020)
    assert all(0 < df["month"]) and all(df["month"] <= 12)
    assert all(0 < df["day"]) and all(df["day"] <= 31)
    return df.drop(columns="date")


@uses_database
def clean_states(df, cnx=None):
    df_states = pd.read_sql(
        """
        select distinct 
            name as "exporter.state.name",
            unnest(synonyms) as "exporter.state.label",   -- TODO .label
            trase_id as "exporter.state.trase_id"
        from views.regions where country = 'BRAZIL' and region_type = 'STATE'
        """,
        cnx.cnx,
    )

    # Some "states" are outside Brazil
    df_states = df_states.append(
        {
            "exporter.state.name": "NONE",
            "exporter.state.label": "NONE",
            "exporter.state.trase_id": "",
        },
        ignore_index=True,
    )
    df = df.replace(
        {
            "exporter.state.label": {
                c: "NONE"
                for c in [
                    "BUENOS AIRES",
                    "CORDOBA",
                    "FLORIDA",
                    "NEW JERSEY",
                    "TEXAS",
                    "VIRGINIA",
                    "SANTA FE",
                ]
            }
        }
    )

    # merge in trase ids
    df = pd.merge(
        df,
        df_states,
        on="exporter.state.label",
        how="left",
        indicator=True,
        validate="many_to_one",
    )

    # assert everything matched
    assert all(df.pop("_merge") == "both")

    return df.drop(columns=["exporter.state.label"], errors="raise")


@uses_database
def clean_municipalities(df, cnx=None):
    df_backup = df.copy()
    df_municipalities = pd.read_sql(
        f"""
        select distinct
            name as "exporter.municipality.name",
            unnest(synonyms) as "exporter.municipality.label",
            trase_id as "exporter.municipality.trase_id",
            substr(trase_id, 0, 6) as "exporter.state.trase_id"
        from views.regions
        where country = 'BRAZIL' and region_type = 'MUNICIPALITY'
        """,
        cnx.cnx,
    )
    states = df_municipalities["exporter.state.trase_id"].unique()

    # replace some specific values
    df = df.replace(
        {
            "exporter.municipality.label": {
                # city -> municipality
                "TATUQUARA": "CURITIBA",
                # some unknowns
                "ZONA RURAL": "UNKNOWN",
                "ESPIRITO SANTOS": "UNKNOWN",
                "RIO GRANDE DO SUL": "UNKNOWN",
                "MATO GROSSO": "UNKNOWN",
                # places not in brazil
                **{p: "NONE" for p in PLACES_NOT_IN_BRAZIL},
                # labels that are missing from the database
                # TODO add these to the database
                "ALTA FLORESTA D OESTE": "ALTA FLORESTA D'OESTE",
                "BELA VISTA DO PARAIS": "BELA VISTA DO PARAISO",
                "CAPAO GRANDE": "VARZEA GRANDE",
                "ENCOIS PAULISTA": "LENCOIS PAULISTA",
                "NOVA BRASILANDIA D OESTE": "NOVA BRASILANDIA D'OESTE",
                "FAZ SAO JOAQUIM": "SAO JOAQUIM",
                "GOVERNADOR DIX SEPT ROSADO": "GOVERNADOR DIX-SEPT ROSADO",
                "SAPUCAI MIRIM": "SAPUCAI-MIRIM",
                "HERVAL D OESTE": "HERVAL D'OESTE",
                "MACHADINHO D OESTE": "MACHADINHO D'OESTE",
                "ESPIGAO D OESTE": "ESPIGAO D'OESTE",
            },
        }
    )

    # add some missing municipalities
    # TODO add these to the database
    df_municipalities = df_municipalities.append(
        [
            {
                "exporter.municipality.name": "POMBAL",
                "exporter.municipality.label": "POMBAL",
                "exporter.municipality.trase_id": "BR-2512101",
                "exporter.state.trase_id": "BR-XX",
            },
            {
                "exporter.municipality.name": "SIDROLANDIA",
                "exporter.municipality.label": "IDROLANDIA",
                "exporter.municipality.trase_id": "BR-5007901",
                "exporter.state.trase_id": "BR-50",
            },
            {
                "exporter.municipality.name": "ALTA FLORESTA",
                "exporter.municipality.label": "FLORESTA",
                "exporter.municipality.trase_id": "BR-5100250",
                "exporter.state.trase_id": "BR-51",
            },
            {
                "exporter.municipality.name": "NONE",
                "exporter.municipality.label": "NA",
                "exporter.municipality.trase_id": "",
                "exporter.state.trase_id": "",
            },
            {
                "exporter.municipality.name": "NONE",
                "exporter.municipality.label": "UNKNOWN",
                "exporter.municipality.trase_id": "",
                "exporter.state.trase_id": "",
            },
        ],
        ignore_index=True,
    )

    # add municipalities for NONE
    df_municipalities = df_municipalities.append(
        [
            {
                "exporter.municipality.name": "NONE",
                "exporter.municipality.label": "NONE",
                "exporter.municipality.trase_id": "",
                "exporter.state.trase_id": state,
            }
            for state in [*states, ""]
        ],
        ignore_index=True,
    )

    # add municipalities for NA
    df_municipalities = df_municipalities.append(
        [
            {
                "exporter.municipality.name": "UNKNOWN",
                "exporter.municipality.label": "NA",
                "exporter.municipality.trase_id": state + "XXXXX",
                "exporter.state.trase_id": state,
            }
            for state in states
            if state != "BR-XX"
        ],
        ignore_index=True,
    )
    df_municipalities = df_municipalities.append(
        [
            {
                "exporter.municipality.name": "UNKNOWN",
                "exporter.municipality.label": label,
                "exporter.municipality.trase_id": state + "XXXXX",
                "exporter.state.trase_id": state,
            }
            for label, state in [
                ("SAO JOAQUIM", "BR-35"),
                ("GUARIBA", "BR-51"),
                ("JAGUARE", "BR-35"),
                ("CENTRAL", "BR-XX"),
                ("ACARAPE", "BR-42"),
                ("GRACA", "BR-35"),
                ("CACHOEIRINHA", "BR-35"),
                ("PAULISTA", "BR-35"),
                ("ESTRELA D OESTE", "BR-33"),
                ("SAO MARCOS", "BR-35"),
                ("HORIZONTE", "BR-31"),
            ]
        ]
    )

    df = pd.merge(
        df.assign(i=range(len(df))),
        df_municipalities,
        on=["exporter.municipality.label", "exporter.state.trase_id"],
        how="left",
        indicator=True,
        validate="many_to_one",
    )
    assert not any(df.pop("i").duplicated())
    assert all(df.pop("_merge") == "both")

    return df.drop(columns=["exporter.municipality.label"], errors="raise")


@uses_database
def clean_exporters_and_add_group(df, cur=None, cnx=None):
    """
    This function adds two columns:

        exporter.name - the default name of the exporter from the database
        exporter.group - the group name from the database

    It does this using the following algorithm:

     1. Construct a Trase ID from exporter.cnpj and use this to perform a lookup in the
        database
     2. If a unique name + group cannot be found through that method, use exporter.label
        to perform a lookup among trader labels in the database

    TODO: try to do this more concisely / in fewer lines of code
    """
    trase_ids = "BR-TRADER-" + df["exporter.cnpj"].str.slice(0, 8)
    trase_ids = trase_ids.replace({"BR-TRADER-00000000": None})
    df = df.assign(**{"exporter.trase_id": trase_ids})
    df_exporters = df[["exporter.label", "exporter.trase_id"]].drop_duplicates()

    # clean exporter names using trase id
    df_exporters[["exporter.trader_id", "exporter.group", "count"]] = (
        find_traders_and_groups_by_trase_id(
            df_exporters.rename(columns={"exporter.trase_id": "trase_id"})[
                ["trase_id"]
            ],
            returning=["trader_id", "group_name", "count"],
            year=sql.Literal(YEAR),
            cur=cur,
            cnx=cnx,
        )
    )
    counts = df_exporters.pop("count")
    assert all(counts.isin([0, 1]))
    not_found_by_trase_id = counts == 0
    print(
        f"{sum(~not_found_by_trase_id)} exporters were found by Trase ID and "
        f"{sum(not_found_by_trase_id)} were not"
    )
    df_found_by_trase_id = df_exporters[~not_found_by_trase_id]
    df_missing = df_exporters[not_found_by_trase_id].copy()

    # if not found by Trase ID, then look by name
    labels = df_missing["exporter.label"].drop_duplicates()
    df_labels = pd.DataFrame(labels)
    df_labels[["exporter.trader_id", "exporter.group", "count"]] = (
        find_traders_and_groups_by_label(
            df_labels.rename(columns={"exporter.label": "trader_label"}),
            returning=["trader_id", "group_name", "count"],
            year=sql.Literal(YEAR),
        )
    )

    # special case for UNKNOWN CUSTOMER
    is_unknown = (df_labels["count"] != 1) & (
        df_labels["exporter.label"] == "UNKNOWN CUSTOMER"
    )
    if any(is_unknown):
        brazil_id = get_country_id("BRAZIL", cur=cur)
        label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
        trader_id = get_label_trader_id(label_id, brazil_id)
        group_id = get_trader_group_id(trader_id, cur=cur)
        group_name = get_node_name(group_id, cur=cur)
        df_labels.loc[is_unknown, "exporter.trader_id"] = trader_id
        df_labels.loc[is_unknown, "exporter.group"] = group_name
        df_labels.loc[is_unknown, "count"] = 1

    # we should have found one unique node for every exporter
    bad = df_labels.pop("count") != 1
    df_labels[bad].to_csv("/tmp/bad.csv")
    if any(bad):
        raise ValueError(f"Missing some exporters:\n{df_labels[bad]}")

    # merge exporters found by trase id back into results
    right = df_found_by_trase_id[
        ["exporter.trase_id", "exporter.trader_id", "exporter.group"]
    ].drop_duplicates()
    df1 = pd.merge(
        df,
        right,
        on=["exporter.trase_id"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df1.pop("_merge")
    df_solved1 = df1[merge == "both"]

    # merge exporters found by label back into results
    df_unsolved = df1[merge != "both"]
    df_unsolved = df_unsolved.drop(
        columns=["exporter.trader_id", "exporter.group"], errors="raise"
    )

    right = df_labels[
        ["exporter.label", "exporter.trader_id", "exporter.group"]
    ].drop_duplicates()
    df_solved2 = pd.merge(
        df_unsolved,
        right,
        on=["exporter.label"],
        how="left",
        validate="many_to_one",
        indicator=True,
    )
    merge = df_solved2.pop("_merge")
    assert all(merge == "both")

    # combine the two
    expected_columns = list(set(df.columns) | {"exporter.trader_id", "exporter.group"})
    assert sorted(df_solved2.columns) == sorted(expected_columns)
    assert sorted(df_solved1.columns) == sorted(expected_columns)
    df_final = pd.concat([df_solved1, df_solved2]).reset_index(drop=True)

    # guarantee that we didn't change the original data
    a = df.sort_values(list(df.columns)).reset_index(drop=True)
    b = df_final[df.columns].sort_values(list(df.columns)).reset_index(drop=True)
    b.columns.name = a.columns.name  # needed for assert equal but don't know what it is
    pd.testing.assert_frame_equal(a, b)

    # add exporter names
    df_final = df_final.astype({"exporter.trader_id": int})
    df_final[["exporter.name"]] = find_default_name_by_node_id(
        df_final[["exporter.trader_id"]].rename(
            columns={"exporter.trader_id": "node_id"}
        ),
        returning=["name"],
        cnx=cnx,
        cur=cur,
    )

    return df_final


def main(include_ship):
    if include_ship:
        key = "brazil/trade/bol/2020/originals/BR_EXP_2020_soy_ship_names.csv"
        encoding = "windows-1252"
    else:
        key = "brazil/trade/bol/2020/originals/BR_EXP_2020_BOL.csv"
        encoding = "utf8"
    df = get_pandas_df_once(
        key,
        encoding=encoding,
        sep=";",
        dtype=str,
        keep_default_na=False,
    )
    df = exclude_corrupt_rows(df)
    df = select_and_rename_columns(df, include_ship)
    df = clean_hs_codes(df)
    df = df.astype({"vol": float})
    df = parse_dates(df)
    clean_string_columns(
        df,
        [
            "country_of_origin.label",
            "exporter.municipality.label",
            "exporter.label",
            "exporter.state.label",
            "port_of_export.label",
            "port_of_import.label",
            "country_of_destination.label",
            "importer.city",
            "importer.label",
            "importer.country.label",
            *(["vessel.label"] if include_ship else []),
        ],
    )
    df = clean_states(df)
    df = clean_municipalities(df)
    df = clean_countries(df)
    df = clean_ports(df)
    df = clean_cnpjs(df)
    df = clean_exporters_and_add_group(df)

    if include_ship:
        write_csv_for_upload(
            df, "brazil/trade/bol/2020/BRAZIL_BOL_2020_SOY_SHIP_NAMES.csv"
        )
    else:
        write_csv_for_upload(df, "brazil/trade/bol/2020/BRAZIL_BOL_2020.csv")


def assert_country_of_origin(df):
    not_brazil = df[df["country_of_origin.label"] != "BRAZIL"].drop_duplicates()
    assert not_brazil.empty, f"Not all origin country is Brazil, found: \n{not_brazil}"


if __name__ == "__main__":
    main(include_ship=True)
    main(include_ship=False)
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "br_exp_2020_bol")
    dbt.ref("hs2017")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})