Skip to content

Gold Colombia Cd Coffee 2020

s3://trase-storage/colombia/trade/cd/export/coffee/2020/gold/gold_colombia_cd_coffee_2020.parquet

Dbt path: trase_production.main_colombia_coffee.gold_colombia_cd_coffee_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/colombia/trade/_schema_colombia_coffee.yml

Model file link: trase/data_pipeline/models/colombia/trade/gold_colombia_cd_coffee_2020.sql

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: gold, colombia, coffee, trade, cd, 2020, diet-trase-coffee


gold_colombia_cd_coffee_2020

Description

Based on existing cleaned data from colombia/trade/cd/export/2020/CD_COLOMBIA_2020.csv.


Details

Column Type Description
year INTEGER
exporter_label VARCHAR
exporter_node_id BIGINT
exporter_group_name VARCHAR
country_of_destination VARCHAR
mass_tonnes DOUBLE
fob DOUBLE
importer_label VARCHAR
port_of_export_label VARCHAR
hs6 VARCHAR

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.cd_colombia_2020
  • model.trase_duckdb.postgres_countries
  • model.trase_duckdb.postgres_traders

Sources

  • ['trase-storage-raw', 'cd_colombia_2020']

No called script or script source not found.

-- gold_colombia_cd_coffee_2020
{{ 
    config(materialized='external') 
}}

{% set replace_eu_countries_with_bloc = False %}

-- Using some variables to make it easier to reuse parts of the code (which should then move to a macro)
{% set trader_prefix = "CO-TRADER-" %}
{% set year = 2020 %}


WITH exports AS (
    SELECT
        *,
        UPPER(TRIM(EXPORTER, ', ')) AS exporter_label,
        -- Format the trader trase_id where it exists
        CASE
            WHEN EXPORTER_TAX_ID = 'UNKNOWN' THEN NULL
            ELSE CONCAT('{{ trader_prefix }}', SPLIT_PART(EXPORTER_TAX_ID, '-', 1))
        END AS processed_tax_id
    FROM {{ source('trase-storage-raw', 'cd_colombia_2020') }}
),
-- Expand synonyms for each country in the Trase database
expanded_countries AS (
    SELECT
        country_name,
        country_trase_id,
        UNNEST(synonyms) AS synonym
    FROM {{ ref('postgres_countries') }}
),
-- Join expanded countries and adjust some country names not in the official list
with_official_country_names AS (
    SELECT
        ex.*,
        ec.country_name AS destination_country_name,
        ec.country_trase_id AS destination_country_trase_id
    FROM
        exports ex
    LEFT JOIN expanded_countries ec
        ON ex.COUNTRY_OF_DESTINATION = ec.synonym
),
-- Include the node id and the group name of the traders that are matched by trase_id
with_group_name_by_trase_id AS (
    SELECT 
        wocn.*,
        wocn.processed_tax_id AS trase_id_directly_matched,
        traders.trader_node_id,
        json_extract_string(array_extract(traders.groups, 1), '$.group') AS group_name,
    FROM 
        with_official_country_names AS wocn
    LEFT JOIN 
        {{ ref('postgres_traders') }} AS traders
        ON wocn.processed_tax_id = traders.trase_id
),
-- Take the exporter names still not matched by trase_id
-- we will try to match them by label
exporters_unmatched_by_trase_id AS (
    SELECT
        DISTINCT exporter_label
    FROM with_group_name_by_trase_id
    WHERE group_name IS NULL
),

--------------- THIS SECTION COULD BE REUSABLE --------------
-- Not sure what the best option could be however. Maybe a Python UDF.. 
-- Other option considered was through a macro, but it can't access a previous CTE table. 
-- An option would be to split the first part in an ephemeral model (which doesn't persist 
-- in a table), ref the model within the macro, and then use a materialized model in the end. 
-- But it might be a bit convoluted.

-- Take the info from the traders that have a matching label
filtered_traders_by_label AS (
    SELECT 
        name, 
        trader_node_id,
        trase_id,
        labels,
        UNNEST(groups) AS company_group
    FROM
        {{ ref('postgres_traders') }}
    WHERE EXISTS (
        SELECT 1 FROM exporters_unmatched_by_trase_id
        WHERE list_contains(labels, exporters_unmatched_by_trase_id.exporter_label)
    )
),
-- Flatten the group info from the matching traders, and rank them by relevance
with_traders_group_info AS (
    SELECT 
        name, 
        trader_node_id,
        trase_id,
        json_extract_string(company_group, '$.group')::VARCHAR AS group_name,
        json_extract_string(company_group, '$.time_start')::TIMESTAMP AS time_start,
        json_extract_string(company_group, '$.time_end')::TIMESTAMP AS time_end,
        CASE
            WHEN trase_id LIKE ('{{trader_prefix}}%')
                AND time_start IS NULL AND time_end IS NULL THEN 1
            WHEN trase_id LIKE ('{{trader_prefix}}%') 
                AND YEAR(time_start) <= {{year}} AND YEAR(time_end) >= {{year}} THEN 2
            WHEN trase_id NOT LIKE ('{{trader_prefix}}%')
                AND time_start IS NULL AND time_end IS NULL THEN 3
            WHEN trase_id NOT LIKE ('{{trader_prefix}}%') 
                AND YEAR(time_start) <= {{year}} AND YEAR(time_end) >= {{year}} THEN 4
            WHEN trase_id IS NULL THEN 5
            ELSE NULL
        END AS ranking,
        labels
    FROM filtered_traders_by_label
),
-- Join the export data with the trader info, taking only the top matching ranking when there are multiple matches
-- DISTINCT ON selects the first record per name
filtered_ranked_traders AS (
    SELECT DISTINCT ON (name)
        name, 
        trader_node_id AS trader_node_id_from_label_match,
        trase_id AS trase_id_from_label_match, 
        labels,
        group_name AS group_name_from_label_match,
        time_start,
        time_end,
        ranking
    FROM with_traders_group_info
    ORDER BY name, ranking ASC, trase_id DESC
),
------ END OF POTENTIALY REUSABLE SECTION ------


with_db_trader_info AS (
    SELECT
        with_tr_id.*,
        frt.*
    FROM with_group_name_by_trase_id AS with_tr_id
    LEFT JOIN filtered_ranked_traders AS frt
        ON list_contains(frt.labels, with_tr_id.exporter_label)
),
final_fields AS (
    SELECT
        YEAR::INT AS year,
        exporter_label,
        COALESCE(trader_node_id, trader_node_id_from_label_match) AS exporter_node_id,
        COALESCE(group_name, group_name_from_label_match) AS exporter_group_name,
        destination_country_name AS country_of_destination,
        VOLUME_RAW/1000 AS mass_tonnes,
        FOB AS fob,
        IMPORTER AS importer_label,
        PORT_OF_EXPORT AS port_of_export_label,
        CASE
            WHEN HS6 LIKE '210120' THEN '210111'
            ELSE HS6
        END AS hs6
    FROM with_db_trader_info
)
-- If true (default), replace EU countries with the economic bloc name
-- using 'as_bool' to render the value as a boolean
{% if replace_eu_countries_with_bloc | as_bool %}
,
countries_with_economic_blocs AS (
    SELECT
        country_name,
        UNNEST(economic_bloc) AS economic_bloc
    FROM {{ ref('postgres_countries') }}
),
eu_countries AS (
    SELECT DISTINCT
        country_name AS eu_country,
        'EUROPEAN UNION' AS european_union_country
    FROM countries_with_economic_blocs
    WHERE
        json_extract_string(economic_bloc, '$.economic_bloc') = 'EUROPEAN UNION'
    AND
        country_name != 'UNITED KINGDOM'
),
final_with_eu AS (
    SELECT
        ff.year,
        ff.exporter_label,
        ff.exporter_node_id,
        ff.exporter_group_name,
        COALESCE(eu.european_union_country, ff.country_of_destination) AS country_of_destination,
        ff.mass_tonnes,
        ff.fob,
        ff.importer_label,
        ff.port_of_export_label,
        ff.hs6
    FROM final_fields ff
    LEFT JOIN eu_countries eu
        ON ff.country_of_destination = eu.eu_country
)

SELECT * FROM final_with_eu

{% else %}
-- If not, keep the normal country names

SELECT * FROM final_fields

{% endif %}