Skip to content

Comtrade Exports

s3://trase-storage/world/trade/statistical_data/comtrade/originals/comtrade_exports.parquet

Dbt path: trase_production.main.comtrade_exports

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/world/trade/statistical_data/comtrade/originals/_schema.yml

Model file link: trase/data_pipeline/models/world/trade/statistical_data/comtrade/originals/comtrade_exports.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: comtrade, statistical_data, trade, world


comtrade_exports

Description

UN Comtrade annual export data (raw response from Comtrade)

Export data from Comtrade, broken down by:

  • Year
  • Reporter (i.e. country of export)
  • Partner (i.e. country of import)
  • Mode of transport (land/sea/air etc.)
  • HS6 commodity code

This dataset preserves all original columns from Comtrade. Comtrade has its own system of codes, see also the UN Comtrade's List of References & Parameter Codes or the interactive preview.

This dataset can be difficult to use, since it contains both total exporters (i.e. to partner world) plus breakdowns by importing country, and so contains duplicated volumes. That duplication also occurs with the totals over all transportation methods and the breakdown by transportation method.

The files comtrade_exports_year_exporter_hs6.parquet and comtrade_exports_year_exporter_hs6_importer.parquet are designed to be more user-friendly. ## What is UN Comtrade

The UN Comtrade dataset is a global trade database that records imports and exports between countries. It provides detailed trade statistics reported by national customs agencies and statistical offices.

  • Trade flows: Exports and imports.
  • Reporter and partner countries: Who is trading with whom.
  • Commodity classification: Data categorized using HS (Harmonized System) or SITC (Standard International Trade Classification) codes.
  • Trade values and quantities: Reported in USD and physical units (kg, liters, etc.).
  • Time period: Data is available from 1962 to present, updated periodically.

Comtrade provides some guides on their data:

How we use the dataset in Trase, for example which SEI-PCS models it is involved in.

This dataset is typically used for quality assurance, to verify that our purchased trade data is of sufficient quality. We also use the dataset for other research projects.

How often the dataset is updated, and when the next update is likely to be.

The dataset is updated monthly and annually based on country reporting schedules. It varies by country, but new data is typically released within 3 to 6 months after the reporting period.

To check the latest update schedule, visit UN Comtrade’s Release Calendar.

How to re-fetch the dataset from the original source.

We have a script which scrapes the API and stores the responses in AWS S3: trase/data/world/trade/statistical_data/comtrade/originals/comtrade_api_responses_retrieved_YYYY_MM_DD.py

If you are interested in looking at the data manually, you can also visit the dashboard at https://comtradeplus.un.org/TradeFlow.

The script that is used to process/clean the dataset.

A DBT pipeline processes the raw data from the UN Comtrade API into a format that is easier to work with. After you have run the script to scrape the API, you can run this pipeline as follows:

# Run the DBT pipeline to process the Comtrade data
trase/data_pipeline/dbt run --target production --select comtrade_exports+

# Run tests to ensure data quality
trase/data_pipeline/dbt test --target production --select comtrade_exports+1

When the dataset was last updated, and by whom.

  • 2025: Nicolas and Harry downloaded the latest data and updated the data pipeline

A history of changes/notes of the dataset.

None.


Details

Column Type Description
type_code VARCHAR "C", meaning "Commodities"
frequency_code VARCHAR "A", meaning "Annual"
reference_period_id BIGINT The first of January in the year that the trade occurred
reference_year BIGINT Year that the trade occurred
reference_month BIGINT
period SMALLINT Year that the trade occurred
reporter_code SMALLINT Code of the exporting country, see https://comtradeapi.un.org/files/v1/app/reference/Reporters.json
reporter_iso VARCHAR Exporting country, see https://comtradeapi.un.org/files/v1/app/reference/partnerAreas.json
reporter_description VARCHAR Name of the exporting country
flow_code VARCHAR "X" meaning "Export"
flow_description VARCHAR
partner_code SMALLINT Code of the importing country, see https://comtradeapi.un.org/files/v1/app/reference/Reporters.json
partner_iso VARCHAR Importing country, see https://comtradeapi.un.org/files/v1/app/reference/partnerAreas.json
partner_description VARCHAR Name of the importing country
partner2_code SMALLINT This would contain a second importer, but in this dataset it is always "World"
partner2_iso VARCHAR This would contain a second importer, but in this dataset it is always "World"
partner2_description VARCHAR This would contain a second importer, but in this dataset it is always "World"
classification_code VARCHAR The edition of the HS classification used (e.g. H5 meaning the fifth edition)
classification_search_code VARCHAR HS, meaning the Harmonized System for commodity trade
is_original_classification BOOLEAN
commodity_code VARCHAR HS6 commodity code
commodity_description VARCHAR English description of the HS6 commodity code
aggregation_level BIGINT 6, indicating aggregation to HS6 level
is_leaf BOOLEAN
customs_code VARCHAR C00, meaning TOTAL customs ('TOTAL CPC')
customs_description VARCHAR
mos_code SMALLINT The mode of supply on delivery of services - applicable only to trade in services
mot_code SMALLINT The mode of transport, see https://comtradeapi.un.org/files/v1/app/reference/ModeOfTransportCodes.json.
mot_description VARCHAR The mode of transport, see https://comtradeapi.un.org/files/v1/app/reference/ModeOfTransportCodes.json.
quantity_unit_code SMALLINT See https://comtradeapi.un.org/files/v1/app/reference/QuantityUnits.json
quantity_unit_abbreviation VARCHAR
quantity DOUBLE Number of items
is_quantity_estimated BOOLEAN
alternative_quantity_unit_code SMALLINT See https://comtradeapi.un.org/files/v1/app/reference/QuantityUnits.json
alternative_quantity_unit_abbreviation VARCHAR See https://comtradeapi.un.org/files/v1/app/reference/QuantityUnits.json
alternative_quantity DOUBLE
is_alternative_quantity_estimated BOOLEAN
net_weight DOUBLE Net weight
is_net_weight_estimated BOOLEAN
gross_weight DOUBLE Gross weight
is_gross_weight_estimated BOOLEAN
cif DOUBLE CIF (cost, insurance and freight) dollar value
fob DOUBLE FOB (Freight-on-board) dollar value
primary_value DOUBLE
legacy_estimation_flag BIGINT
is_reported BOOLEAN
is_aggregate BOOLEAN
retrieved VARCHAR

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.comtrade_api_responses_get_annual

Sources

  • ['trase-storage-raw', 'comtrade_api_responses_get_annual']

No called script or script source not found.

import re

import polars as pl


def camel_to_snake(string: str) -> str:
    return re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower().replace("i_s_o", "iso")


def rename_and_cast_columns(df: pl.DataFrame) -> pl.DataFrame:
    # use lower snake case for column names
    df = df.rename(
        {column: camel_to_snake(column).lower() for column in df.columns},
        strict=True,
    )

    # rename a couple of columns
    df = df.rename({"cifvalue": "cif", "fobvalue": "fob"}, strict=True)

    # replace some common abbreviations to improve readability
    df = df.rename(
        {
            column: (
                column.replace("_wgt", "_weight")
                .replace("_desc", "_description")
                .replace("_abbr", "_abbreviation")
                .replace("cmd_", "commodity_")
                .replace("aggr_", "aggregation_")
                .replace("alt_", "alternative_")
                .replace("freq_", "frequency_")
                .replace("ref_", "reference_")
                .replace("qty", "quantity")
            )
            for column in df.columns
        },
        strict=True,
    )

    # alter some types of some columns
    df = df.with_columns(
        pl.col("alternative_quantity_unit_code").cast(pl.Int16()),
        pl.col("mos_code").str.strip_chars().cast(pl.Int16()),
        pl.col("mot_code").cast(pl.Int16()),
        pl.col("partner2_code").cast(pl.Int16()),
        pl.col("partner_code").cast(pl.Int16()),
        pl.col("period").cast(pl.Int16()),
        pl.col("quantity_unit_code").cast(pl.Int16()),
        pl.col("reporter_code").cast(pl.Int16()),
    )

    # strip trailing whitespace from partner_iso
    df = df.with_columns(pl.col("partner_iso").str.strip_chars_end())

    return df


def filter_to_latest_api_response_and_drop_api_columns(df_api_responses):
    # filter to the latest response for each API parameter
    api_columns = [col for col in df_api_responses.columns if col.startswith("api")]
    result = (
        df_api_responses.sort(by="retrieved")
        .group_by(api_columns)
        .agg([pl.col("retrieved").last()])
    )

    df = df_api_responses.join(result, on=result.columns, how="inner")

    # drop the api columns to avoid confusing the user
    return df.drop(api_columns)


def model(dbt, cursor):
    dbt.config(materialized="external")

    relation = dbt.source("trase-storage-raw", "comtrade_api_responses_get_annual")

    # filter to the API parameters relevant to a breakdown by year, reporter (exporter),
    # hs6, partner (importer) and mode of transport
    relation = (
        relation.filter("apiAggregateBy = 'None'")
        .filter("apiBreakdownMode = 'plus'")
        .filter("apiClCode = 'HS'")
        .filter("apiCountOnly = 'None'")
        .filter("apiCustomsCode = 'C00'")
        .filter("apiFlowCode = 'X'")
        .filter("apiFormat_output = 'JSON'")
        .filter("apiFreqCode = 'A'")
        .filter("apiIncludeDesc = 'True'")
        .filter("apiMaxRecords = 250000")
        .filter("apiMotCode = 'None'")
        .filter("apiPartner2Code = 0")
        .filter("apiPartnerCode = 'None'")
        .filter("apiReporterCode = 'None'")
        .filter("apiTypeCode = 'C'")
    )

    df = relation.pl()
    df = filter_to_latest_api_response_and_drop_api_columns(df)
    df = rename_and_cast_columns(df)

    return df