Skip to content

DBT: Brazil Mdic Municipality 1997

File location: s3://trase-storage/brazil/trade/mdic/municipality/brazil_mdic_municipality_1997.csv

DBT model name: brazil_mdic_municipality_1997

Explore on Metabase: Full table; summary statistics

DBT details


Description

Base de Dados de Exportação do Brasil - MDIC/SECEX

The Brazil export dataset is published by Brazil's Ministry of Development, Industry, and Trade (MDIC) through its Foreign Trade Secretariat (SECEX). It contains detailed records of Brazil's exports, including product classifications, destinations, values, and volumes.

What the dataset is

The dataset contained trade data aggregated to a monthly level, and different spatial aggregation levels (national, state, municipality, port).

There are two different aggregations of the same underlying data. Both provide net kilograms and FOB dollar value in US$:

Port-level export data

The "port-level" data, called Base de dados detalhada por NCM breaks down exports by the following columns:

  • Year and month.
  • NCM commodity code
    • The Nomenclatura Comum do Mercosul, or Mercosul Common Nomenclature is an eight-digit code given to goods in Brazil for use in international trade. It can be considered equivalent to an HS8 code.
  • Statistical unit code.
  • Product destination country code.
  • State of product origin (as in, the Brazilian state) (*).
    • This indicates the state where the exported product was shipped from, not necessarily where it was produced.
  • Transport route code (land, maritime, air, etc.). Can be found in the mdic_via_codes csv source table.
  • Shipment URF code (**).
    • A URF (Unidade da Receita Federal, or Federal Revenue Unit) code identifies specific customs units (customs posts, ports, airports, and border checkpoints) where export operations are processed.

Notes from Nanxu, 09-20-2021

Although the file names are the same with files under brazil/trade/mdic/port/originals/, there are column differences between the two groups of files.

  • Files under brazil/trade/mdic/port/old/ contain: CO_UF and CO_PORTO
  • Files under brazil/trade/mdic/port/originals/ contain: SG_UF_NCM and CO_URF

Municipality-level export data

The "municipality-level" data, called Base de dados detalhada por Município da empresa exportadora/importadora e Posição do Sistema Harmonizado (SH4) breaks down exports by the following columns:

  • Year and month.
  • HS4 commodity code.
  • Country code of destination of the product.
  • State of company's tax domicile (as in, the Brazilian state).
  • Code of the municipality of tax domicile of the exporting/importing company.

This dataset used to include state of product origin (*) and shipment URF code (**) columns, which can be seen in the port-level dataset above. However, due to privacy concerns these columns were removed in mid-2018 and the data was aggregated. The reasoning was that since the dataset provided export/import data at the municipality level, including the state of origin and customs office made it easier to infer specific exporters/importers, especially in smaller cities with limited trading companies. This posed a risk to business confidentiality as competitors could analyze shipment patterns. The Brazilian government aimed to strengthen data protection in trade statistics. Although the General Data Protection Law (LGPD - Lei Geral de Proteção de Dados) was only enacted in 2020, earlier steps were taken to reduce risks of indirect identification of companies.

How we use the dataset in Trase, for example which SEI-PCS models it is involved in

The dataset is used in all of our SEI-PCS models for Brazil.

How often the dataset is updated, and when the next update is likely to be

The MDIC/SECEX export dataset is updated monthly. The data for a given month is typically released in the first half of the following month.

How to re-fetch the dataset from the original source

The data is downloaded as CSV files from https://www.gov.br/produtividade-e-comercio-exterior/pt-br/assuntos/comercio-exterior/estatisticas/base-de-dados-bruta. There is a bash script to do this:

bash trase/data/brazil/trade/mdic/municipality_port/originals/exp_20xx_and_mun.sh 2023 2024 2025

Alternatively, it can be done manually.

Manual download

The port-level export data is under 1. Base de dados detalhada por NCM: > Exportação. The file is downloaded locally and called EXP_{year}.csv. We rename the file, adding the date that it was downloaded and upload it to S3, e.g.:

s3://trase-storage/brazil/trade/mdic/port/originals/EXP_2024_retrieved_21-11-2024.csv

The municipality-level export data is under 2. Base de dados detalhada por Município da empresa exportadora/importadora e Posição do Sistema Harmonizado (SH4) > Exportação. The file is downloaded locally and called EXP_{year}_MUN.csv. We rename the file, adding the date that it was downloaded and upload it to S3, e.g.:

s3://trase-storage/brazil/trade/mdic/municipality/originals/EXP_2024_MUN_retrieved_21-11-2024.csv

The script that is used to process/clean the dataset

trase/data/brazil/trade/mdic/municipality_port/brazil_mdic_muncipality_port_201X.py.

When the dataset was last updated, and by whom

  • 2022: Nanxu Su
  • 2021: Tomas Carvalho and Nanxu Su
  • 2020: Tomas Carvalho and Harry Biddle
  • 2019: Harry Biddle
  • 2018: Nanxu Su
  • 1997 to 2017 inclusive: Harry Biddle

A history of changes/notes of the dataset

  • April 2021: new methodological notice, also available on S3:
  • s3://trase-storage/brazil/trade/mdic/Methodological Change April 2021.pdf
  • s3://trase-storage/brazil/trade/mdic/Methodological Change April 2021 - Google Translate English.pdf
  • Mid-2018: state of origin and customs office are removed from the municipality-level dataset to protect business confidentiality.

Acceptance criteria for sufficient level of quality of the dataset

There are no acceptance criteria defined for this dataset.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.ports
  • source.trase_duckdb.trase-storage-raw.uf
  • source.trase_duckdb.trase-storage-raw.countries
  • source.trase_duckdb.trase-storage-raw.exp_1997_mun
  • source.trase_duckdb.trase-storage-raw.id_mun_master_a

Sources

  • ['trase-storage-raw', 'ports']
  • ['trase-storage-raw', 'uf']
  • ['trase-storage-raw', 'countries']
  • ['trase-storage-raw', 'exp_1997_mun']
  • ['trase-storage-raw', 'id_mun_master_a']
"""
Creates the following files:

    s3://trase-storage/brazil/trade/mdic/municipality/brazil_mdic_municipality_1997.parquet
    s3://trase-storage/brazil/trade/mdic/municipality/brazil_mdic_municipality_ ...
    s3://trase-storage/brazil/trade/mdic/municipality/brazil_mdic_municipality_2020.parquet

and

    s3://trase-storage/brazil/trade/mdic/port/brazil_mdic_port_1997.parquet
    s3://trase-storage/brazil/trade/mdic/port/brazil_mdic_port_ ...
    s3://trase-storage/brazil/trade/mdic/port/brazil_mdic_port_2020.parquet

"""

from argparse import ArgumentParser

import numpy as np
import pandas as pd
from tqdm import tqdm

from trase.tools import uses_database
from trase.tools.aws.aws_helpers import read_s3_parquet
from trase.tools.aws.metadata import write_parquet_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.sei_pcs.pandas_utilities import full_merge
from trase.tools.sps import get_pandas_df_once
from trase.tools.utilities.helpers import clean_string

get_pandas_df = get_pandas_df_once

MDIC_MUNICIPALITY_COLUMNS = {
    "CO_ANO": "year",
    "CO_MES": "month",
    "SH4": "hs4",
    "CO_PAIS": "country_of_destination.pais",
    "KG_LIQUIDO": "vol",
    "VL_FOB": "fob",
}

MDIC_MUNICIPALITY_COLUMNS_OLD = {
    **MDIC_MUNICIPALITY_COLUMNS,
    "CO_UF": "state.uf_number",
    "CO_PORTO": "port.code",
    "CO_MUN_GEO": "exporter.municipality.mdic_code",
}

MDIC_MUNICIPALITY_COLUMNS_NEW = {
    **MDIC_MUNICIPALITY_COLUMNS,
    # "SG_UF_MUN": "exporter.state.uf_code",
    "CO_MUN": "exporter.municipality.mdic_code",
}
MDIC_PORT_COLUMNS = {
    "CO_ANO": "year",
    "CO_MES": "month",
    "CO_NCM": "hs8",
    # "CO_UNID": "unit",
    "CO_PAIS": "country_of_destination.pais",
    "SG_UF_NCM": "state.uf_letter",
    "CO_VIA": "via",
    "CO_URF": "port.urf",
    # "QT_ESTAT": "quantity",
    "KG_LIQUIDO": "vol",
    "VL_FOB": "fob",
}


YEARS_AND_SUFFIXES = {
    2012: "",
    2013: "",
    2014: "",
    2015: "",
    2016: "",
    2017: "",
    2018: "",
    2019: "",
    2020: "",
    2021: "_retrieved_2025-02-12",
    2022: "_retrieved_2025-02-14",
    2023: "_retrieved_2025-02-12",
    2024: "_retrieved_2025-02-12",
}


@uses_database
def get_commodities(cnx=None):
    return pd.read_sql(
        "SELECT * FROM views.commodities",
        con=cnx.cnx,
    )


def clean_months(df):
    return df.assign(month=df["month"].astype(int))


def select_cast_and_rename_columns(df, columns: dict):
    df = df[list(columns.keys())].rename(columns=columns, errors="raise")
    df = df.astype({"fob": int, "vol": int, "year": int})
    return df


def filter_to_hs4_codes_of_interest_to_trase(df, df_commodities):
    hs4 = df_commodities["hs_code"].str.slice(0, 4).drop_duplicates().rename("hs4")
    return pd.merge(df, hs4, validate="many_to_one")


def replace_port_urfs(df, df_port_urfs):
    return full_merge(
        df,
        df_port_urfs,
        on="port.urf",
        how="left",
        validate="many_to_one",
    )


def extract_country_labels(df_regions):
    """
    country_of_destination.name  country_of_destination.label country_of_destination.trase_id
                    AFGHANISTAN                   AFEGANISTAO                              AF
                    AFGHANISTAN                   AFEGANISTÃO                              AF
                    AFGHANISTAN                    AFEGANISTO                              AF
                    AFGHANISTAN                    AFGANISTAN                              AF
                    AFGHANISTAN                   AFGHANISTAN                              AF
    """
    df_filtered = df_regions[df_regions["trase_id"].str.len() == 2]
    df_filtered = df_filtered[df_filtered["level"].astype(int) == 1]
    df_exploded = df_filtered.explode("synonyms")
    df_renamed = df_exploded[["name", "synonyms", "trase_id"]].rename(
        columns={
            "name": "country_of_destination.name",
            "synonyms": "country_of_destination.label",
            "trase_id": "country_of_destination.trase_id",
        },
        errors="raise",
    )
    return df_renamed.drop_duplicates()


def clean_country_pais(df_pais, df_regions):
    """
    A lookup of a "PAIS" code to a country name. We also load standard names for
    countries from the database along with ISO codes:

      country_of_destination.pais  country_of_destination.name  country_of_destination.trase_id
                              000              UNKNOWN COUNTRY                               XX
                              013                  AFGHANISTAN                               AF
                              015                ALAND ISLANDS                               AX
                              017                      ALBANIA                               AL
                              020              UNKNOWN COUNTRY                               XX

    """

    df_pais = df_pais[["CO_PAIS", "NO_PAIS_ING"]].copy()
    df_pais["CO_PAIS"] = df_pais["CO_PAIS"].str.rjust(3, "0")
    df_pais = df_pais.rename(
        columns={
            "NO_PAIS_ING": "country_of_destination.label",
            "CO_PAIS": "country_of_destination.pais",
        },
        errors="raise",
    )
    df_pais["country_of_destination.label"] = df_pais[
        "country_of_destination.label"
    ].apply(clean_string)

    df_country_labels = extract_country_labels(df_regions)
    (unknown_name,) = df_country_labels[
        df_country_labels["country_of_destination.trase_id"] == "XX"
    ][
        "country_of_destination.name"
    ].unique()  # "UNKNOWN COUNTRY"
    df_pais = pd.merge(
        df_pais,
        df_country_labels,
        how="left",
        validate="many_to_one",  # GUERNSEY and JERSEY both have two PAIS codes
        indicator=True,
    )
    df_pais.loc[df_pais["_merge"] != "both", "country_of_destination.name"] = (
        unknown_name
    )
    df_pais.loc[df_pais["_merge"] != "both", "country_of_destination.trase_id"] = "XX"
    df_pais = df_pais.drop(
        columns=["_merge", "country_of_destination.label"], errors="raise"
    )
    return df_pais


def clean_state_uf(df):
    df = df[["CO_UF_IBGE", "CO_UF", "UF"]]
    df = df.rename(
        columns={
            "CO_UF_IBGE": "state.code",
            "CO_UF": "state.uf_number",
            "UF": "state.uf_letter",
        },
        errors="raise",
    )

    df["state.trase_id"] = "BR-" + df["state.code"]
    df = df.drop(columns="state.code", errors="raise")
    new_data = pd.DataFrame(
        # fmt: off
        # BR-XX is used for "unknown state in Brazil", and XX for "unknown,
        # possibly outside Brazil"
        columns=["state.uf_number", "state.uf_letter", "state.trase_id"],
        data=[  [             "93",              "EX",             "XX"],  # exterior / region not declared
                [             "94",              "CB",             "XX"],  # onboard consumption / onboard consumption
                [             "96",              "MN",             "XX"],  # nationalized merchandise / nationalized merchandise
                [             "97",              "RE",             "XX"],  # re-export / re-export
                [             "98",              "ED",          "BR-XX"],  # miscellaneous states - coffee / region not declared
                [             "95",              "ND",          "BR-XX"],  # not declared / region not declared
                [             "99",              "ZN",          "BR-XX"]]  # undeclared zone / region not declared
    )
    return pd.concat(
        [df, new_data],
        ignore_index=True,
        verify_integrity=True,
        sort=False,
    )


def clean_port_urfs(df):
    df = df[["urf_code", "name", "trase_id"]]
    return df.rename(
        columns={
            "urf_code": "port.urf",
            "name": "port.name",
            "trase_id": "port.trase_id",
        },
        errors="raise",
    )


def replace_country_pais_codes_with_names_and_trase_ids(df, df_pais, df_regions):
    df_pais = clean_country_pais(df_pais, df_regions)
    return full_merge(
        df,
        df_pais,
        on="country_of_destination.pais",
        how="left",
    )


def replace_state_uf_codes_with_names_and_trase_ids(df, df_state_uf, column):
    df_state_uf = df_state_uf[[column, "state.trase_id"]].drop_duplicates()
    return full_merge(
        df,
        df_state_uf,
        on=column,
        how="left",
        validate="many_to_one",
    )


def add_via_description(df, df_via):
    df_via = df_via.rename(
        columns={
            "via_code": "via",
        },
        errors="raise",
    )
    return full_merge(
        df,
        df_via,
        on="via",
        how="left",
        validate="many_to_one",
    )


def add_derived_hscodes(df):
    df = df.assign(hs6=df["hs8"].str.slice(0, 6))
    df = df.assign(hs4=df["hs8"].str.slice(0, 4))
    return df


def clean_mdic_port(
    df, df_commodities, df_pais, df_regions, df_port_urfs, df_uf, df_via
):
    df_port_urfs = clean_port_urfs(df_port_urfs)
    df = select_cast_and_rename_columns(df, MDIC_PORT_COLUMNS)

    df = clean_months(df)
    df = add_derived_hscodes(df)
    df = filter_to_hs4_codes_of_interest_to_trase(df, df_commodities)

    df = replace_country_pais_codes_with_names_and_trase_ids(df, df_pais, df_regions)
    df_state_uf = clean_state_uf(df_uf)
    df = replace_state_uf_codes_with_names_and_trase_ids(
        df, df_state_uf, "state.uf_letter"
    )
    df = replace_port_urfs(df, df_port_urfs)
    df = add_via_description(df, df_via)

    return df


def replace_mdic_municipality_codes(df, df_municipality_codes):
    df_municipality_codes = df_municipality_codes[["ID_MUN_IBGE", "ID_MUN_MDIC"]]
    df_municipality_codes = df_municipality_codes.rename(
        columns={
            "ID_MUN_MDIC": "exporter.municipality.mdic_code",
            "ID_MUN_IBGE": "exporter.municipality.code",
        },
        errors="raise",
    )
    df_municipality_codes["exporter.municipality.trase_id"] = (
        "BR-" + df_municipality_codes["exporter.municipality.code"]
    )
    df_municipality_codes = df_municipality_codes.append(
        [
            {
                "exporter.municipality.mdic_code": "9999999",
                "exporter.municipality.trase_id": "BR-XXXXXXX",
            },
            {
                "exporter.municipality.mdic_code": "9400000",
                "exporter.municipality.trase_id": "XX",
            },  # outside braxil
            {
                "exporter.municipality.mdic_code": "9300000",
                "exporter.municipality.trase_id": "XX",
            },  # outside braxil
        ],
        ignore_index=True,
        verify_integrity=True,
        sort=False,
    )
    return full_merge(
        df,
        df_municipality_codes,
        how="left",
        on="exporter.municipality.mdic_code",
        validate="many_to_one",
    )


def clean_mdic_municipality(
    df, df_commodities, df_regions, df_municipality_codes, df_pais, year
):
    is_old = year < 2018

    df = select_cast_and_rename_columns(
        df, MDIC_MUNICIPALITY_COLUMNS_OLD if is_old else MDIC_MUNICIPALITY_COLUMNS_NEW
    )
    df = filter_to_hs4_codes_of_interest_to_trase(df, df_commodities)

    df = clean_months(df)
    df_country_labels = extract_country_labels(df_regions)
    df = replace_country_pais_codes_with_names_and_trase_ids(df, df_pais, df_regions)
    df = replace_mdic_municipality_codes(df, df_municipality_codes)

    return df


def replace_mdic_port_codes_with_port_description(df, df_ports):
    df_ports = df_ports[["CO_PORTO", "DESC_PORTO"]]
    df_ports = df_ports.rename(
        columns={
            "CO_PORTO": "port.code",
            "DESC_PORTO": "port.description",
        },
        errors="raise",
    )
    df = full_merge(df, df_ports, on="port.code", how="left")

    # split port description into label/type/state
    def split_port_description(description):
        """Splits e.g. into three components"""
        parts = description.split(" - ")
        if len(parts) == 2:
            return parts[0], "", parts[1]
        elif len(parts) == 3:
            return parts[0], parts[1], parts[2]
        else:
            raise ValueError(description)

    (
        df["port.label"],
        df["port.type"],
        df["port.state.uf_letter"],
    ) = zip(*df["port.description"].apply(split_port_description))
    df = df.drop(columns=["port.description"], errors="raise")
    return df


def clean_port_labels(df, df_port_labels):
    df["port.label"] = df["port.label"].str.replace("^PORTO DE ", "")
    df = pd.merge(
        df,
        df_port_labels,
        on="port.label",
        how="left",
        indicator=True,
        validate="many_to_one",
    )
    missing = df.pop("_merge") != "both"
    df.loc[missing, "port.trase_id"] = ""
    df["port.name"] = np.where(missing, df.pop("port.label"), df["port.name"])
    return df


def replace_port_state_uf_codes_with_trase_ids(df, df_state_uf):
    df_state_uf = df_state_uf[["state.uf_letter", "state.trase_id"]]
    df_state_uf = df_state_uf.rename(
        columns={
            "state.uf_letter": "port.state.uf_letter",
            "state.trase_id": "port.state.trase_id",
        },
        errors="raise",
    )
    return full_merge(
        df,
        df_state_uf.drop_duplicates(),
        on="port.state.uf_letter",
        how="left",
        validate="many_to_one",
    )


def replace_mdic_port_codes(df, df_state_uf, df_port_labels, df_ports):
    df = replace_mdic_port_codes_with_port_description(df, df_ports)
    df = replace_port_state_uf_codes_with_trase_ids(df, df_state_uf)
    df = clean_port_labels(df, df_port_labels)
    return df


def extract_port_labels(df_regions):
    df = df_regions[
        (df_regions["region_type"] == "PORT") & (df_regions["country"] == "BRAZIL")
    ]
    df_exploded = df.explode("synonyms")
    df_renamed = df_exploded[["name", "synonyms", "trase_id"]].rename(
        columns={
            "name": "port.name",
            "synonyms": "port.label",
            "trase_id": "port.trase_id",
        },
        errors="raise",
    )
    df_renamed["port.trase_id"] = df_renamed["port.trase_id"].fillna("")
    return df_renamed.drop_duplicates()


def extra_cleaning_of_old_mdic_municipality(df, df_uf, df_regions, df_ports):
    df_port_labels = extract_port_labels(df_regions)
    df_state_uf = clean_state_uf(df_uf)
    df = replace_state_uf_codes_with_names_and_trase_ids(df, "state.uf_number")
    df = replace_mdic_port_codes(df, df_state_uf, df_port_labels, df_ports)
    return df


def get_pais():
    return get_pandas_df("brazil/metadata/countries.csv", dtype=str)


@uses_database
def get_regions(cnx=None):
    # get columns of views.regions
    columns_df = pd.read_sql(
        """
        SELECT column_name FROM information_schema.columns 
        WHERE table_schema = 'views' AND table_name = 'regions'
        """,
        cnx.cnx,
    )
    columns = list(columns_df["column_name"].values)

    # remove _geometry column
    assert "_geometry" in columns
    columns.pop(columns.index("_geometry"))

    # select all other columns
    return pd.read_sql(f"select {' ,'.join(columns)} from views.regions", cnx.cnx)


def get_uf():
    return get_pandas_df(
        "brazil/metadata/UF.csv",
        sep=",",
        dtype=str,
    )


def get_port_urfs():
    return read_s3_parquet(
        "brazil/metadata/brazil_port_urfs.parquet",
    )


def get_mdic_municipality_codes():
    return get_pandas_df(
        "brazil/metadata/ID_MUN_MASTER_A.csv",
        encoding="latin-1",
        dtype=str,
    )


def get_port_codes():
    return get_pandas_df("brazil/metadata/ports.csv", dtype=str)


@uses_database
def process_mdic_port(suffix, year, cnx=None):
    S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
    df_original = get_pandas_df(
        f"brazil/trade/mdic/port/originals/EXP_{year}{suffix}.csv",
        dtype=str,
        keep_default_na=False,
    )
    df_commodities = get_commodities(cnx=cnx)
    df_pais = get_pais()
    df_port_urfs = get_port_urfs()
    df_uf = get_uf()
    df_regions = get_regions(cnx=cnx)

    df = clean_mdic_port(
        df_original, df_commodities, df_pais, df_regions, df_port_urfs, df_uf
    )

    write_parquet_for_upload(
        df, f"brazil/trade/mdic/port/brazil_mdic_port_{year}.parquet"
    )


@uses_database
def process_mdic_municipality(suffix, year, cnx=None):
    S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()

    df_original = get_pandas_df(
        f"brazil/trade/mdic/municipality/originals/EXP_{year}_MUN{suffix}.csv",
        dtype=str,
        keep_default_na=False,
    )
    df_commodities = get_commodities(cnx=cnx)
    df_regions = get_regions(cnx=cnx)
    df_mdic_municipality_codes = get_mdic_municipality_codes()
    df_pais = get_pais()

    df = clean_mdic_municipality(
        df_original,
        df_commodities,
        df_regions,
        df_mdic_municipality_codes,
        df_pais,
        year,
    )

    is_old = year < 2018
    if is_old:
        df_uf = get_uf()
        df_ports = get_port_codes()

        df = extra_cleaning_of_old_mdic_municipality(df, df_uf, df_regions, df_ports)

    write_parquet_for_upload(
        df,
        f"brazil/trade/mdic/municipality/brazil_mdic_municipality_{year}.parquet",
    )


@uses_database
def main(cnx=None):
    parser = ArgumentParser()
    parser.add_argument("year", type=int, nargs="*", choices=YEARS_AND_SUFFIXES.keys())
    args = parser.parse_args()

    with tqdm(total=2 * len(args.year)) as progress:
        for year in args.year:
            suffix = YEARS_AND_SUFFIXES[year]

            process_mdic_municipality(suffix, year, cnx=cnx)
            progress.update(1)

            process_mdic_port(suffix, year, cnx=cnx)
            progress.update(1)


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "ports")
    dbt.source("trase-storage-raw", "uf")
    dbt.source("trase-storage-raw", "countries")
    dbt.source("trase-storage-raw", "exp_1997_mun")
    dbt.source("trase-storage-raw", "id_mun_master_a")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})