Gold Colombia Cd Coffee 2020
s3://trase-storage/colombia/trade/cd/export/coffee/2020/gold/gold_colombia_cd_coffee_2020.parquet
Dbt path: trase_production.main_colombia_coffee.gold_colombia_cd_coffee_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/colombia/trade/_schema_colombia_coffee.yml
Model file link: trase/data_pipeline/models/colombia/trade/gold_colombia_cd_coffee_2020.sql
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: gold, colombia, coffee, trade, cd, 2020, diet-trase-coffee
gold_colombia_cd_coffee_2020
Description
Based on existing cleaned data from colombia/trade/cd/export/2020/CD_COLOMBIA_2020.csv.
Details
| Column | Type | Description |
|---|---|---|
year |
INTEGER |
|
exporter_label |
VARCHAR |
|
exporter_node_id |
BIGINT |
|
exporter_group_name |
VARCHAR |
|
country_of_destination |
VARCHAR |
|
mass_tonnes |
DOUBLE |
|
fob |
DOUBLE |
|
importer_label |
VARCHAR |
|
port_of_export_label |
VARCHAR |
|
hs6 |
VARCHAR |
Models / Seeds
source.trase_duckdb.trase-storage-raw.cd_colombia_2020model.trase_duckdb.postgres_countriesmodel.trase_duckdb.postgres_traders
Sources
['trase-storage-raw', 'cd_colombia_2020']
No called script or script source not found.
-- gold_colombia_cd_coffee_2020
{{
config(materialized='external')
}}
{% set replace_eu_countries_with_bloc = False %}
-- Using some variables to make it easier to reuse parts of the code (which should then move to a macro)
{% set trader_prefix = "CO-TRADER-" %}
{% set year = 2020 %}
WITH exports AS (
SELECT
*,
UPPER(TRIM(EXPORTER, ', ')) AS exporter_label,
-- Format the trader trase_id where it exists
CASE
WHEN EXPORTER_TAX_ID = 'UNKNOWN' THEN NULL
ELSE CONCAT('{{ trader_prefix }}', SPLIT_PART(EXPORTER_TAX_ID, '-', 1))
END AS processed_tax_id
FROM {{ source('trase-storage-raw', 'cd_colombia_2020') }}
),
-- Expand synonyms for each country in the Trase database
expanded_countries AS (
SELECT
country_name,
country_trase_id,
UNNEST(synonyms) AS synonym
FROM {{ ref('postgres_countries') }}
),
-- Join expanded countries and adjust some country names not in the official list
with_official_country_names AS (
SELECT
ex.*,
ec.country_name AS destination_country_name,
ec.country_trase_id AS destination_country_trase_id
FROM
exports ex
LEFT JOIN expanded_countries ec
ON ex.COUNTRY_OF_DESTINATION = ec.synonym
),
-- Include the node id and the group name of the traders that are matched by trase_id
with_group_name_by_trase_id AS (
SELECT
wocn.*,
wocn.processed_tax_id AS trase_id_directly_matched,
traders.trader_node_id,
json_extract_string(array_extract(traders.groups, 1), '$.group') AS group_name,
FROM
with_official_country_names AS wocn
LEFT JOIN
{{ ref('postgres_traders') }} AS traders
ON wocn.processed_tax_id = traders.trase_id
),
-- Take the exporter names still not matched by trase_id
-- we will try to match them by label
exporters_unmatched_by_trase_id AS (
SELECT
DISTINCT exporter_label
FROM with_group_name_by_trase_id
WHERE group_name IS NULL
),
--------------- THIS SECTION COULD BE REUSABLE --------------
-- Not sure what the best option could be however. Maybe a Python UDF..
-- Other option considered was through a macro, but it can't access a previous CTE table.
-- An option would be to split the first part in an ephemeral model (which doesn't persist
-- in a table), ref the model within the macro, and then use a materialized model in the end.
-- But it might be a bit convoluted.
-- Take the info from the traders that have a matching label
filtered_traders_by_label AS (
SELECT
name,
trader_node_id,
trase_id,
labels,
UNNEST(groups) AS company_group
FROM
{{ ref('postgres_traders') }}
WHERE EXISTS (
SELECT 1 FROM exporters_unmatched_by_trase_id
WHERE list_contains(labels, exporters_unmatched_by_trase_id.exporter_label)
)
),
-- Flatten the group info from the matching traders, and rank them by relevance
with_traders_group_info AS (
SELECT
name,
trader_node_id,
trase_id,
json_extract_string(company_group, '$.group')::VARCHAR AS group_name,
json_extract_string(company_group, '$.time_start')::TIMESTAMP AS time_start,
json_extract_string(company_group, '$.time_end')::TIMESTAMP AS time_end,
CASE
WHEN trase_id LIKE ('{{trader_prefix}}%')
AND time_start IS NULL AND time_end IS NULL THEN 1
WHEN trase_id LIKE ('{{trader_prefix}}%')
AND YEAR(time_start) <= {{year}} AND YEAR(time_end) >= {{year}} THEN 2
WHEN trase_id NOT LIKE ('{{trader_prefix}}%')
AND time_start IS NULL AND time_end IS NULL THEN 3
WHEN trase_id NOT LIKE ('{{trader_prefix}}%')
AND YEAR(time_start) <= {{year}} AND YEAR(time_end) >= {{year}} THEN 4
WHEN trase_id IS NULL THEN 5
ELSE NULL
END AS ranking,
labels
FROM filtered_traders_by_label
),
-- Join the export data with the trader info, taking only the top matching ranking when there are multiple matches
-- DISTINCT ON selects the first record per name
filtered_ranked_traders AS (
SELECT DISTINCT ON (name)
name,
trader_node_id AS trader_node_id_from_label_match,
trase_id AS trase_id_from_label_match,
labels,
group_name AS group_name_from_label_match,
time_start,
time_end,
ranking
FROM with_traders_group_info
ORDER BY name, ranking ASC, trase_id DESC
),
------ END OF POTENTIALY REUSABLE SECTION ------
with_db_trader_info AS (
SELECT
with_tr_id.*,
frt.*
FROM with_group_name_by_trase_id AS with_tr_id
LEFT JOIN filtered_ranked_traders AS frt
ON list_contains(frt.labels, with_tr_id.exporter_label)
),
final_fields AS (
SELECT
YEAR::INT AS year,
exporter_label,
COALESCE(trader_node_id, trader_node_id_from_label_match) AS exporter_node_id,
COALESCE(group_name, group_name_from_label_match) AS exporter_group_name,
destination_country_name AS country_of_destination,
VOLUME_RAW/1000 AS mass_tonnes,
FOB AS fob,
IMPORTER AS importer_label,
PORT_OF_EXPORT AS port_of_export_label,
CASE
WHEN HS6 LIKE '210120' THEN '210111'
ELSE HS6
END AS hs6
FROM with_db_trader_info
)
-- If true (default), replace EU countries with the economic bloc name
-- using 'as_bool' to render the value as a boolean
{% if replace_eu_countries_with_bloc | as_bool %}
,
countries_with_economic_blocs AS (
SELECT
country_name,
UNNEST(economic_bloc) AS economic_bloc
FROM {{ ref('postgres_countries') }}
),
eu_countries AS (
SELECT DISTINCT
country_name AS eu_country,
'EUROPEAN UNION' AS european_union_country
FROM countries_with_economic_blocs
WHERE
json_extract_string(economic_bloc, '$.economic_bloc') = 'EUROPEAN UNION'
AND
country_name != 'UNITED KINGDOM'
),
final_with_eu AS (
SELECT
ff.year,
ff.exporter_label,
ff.exporter_node_id,
ff.exporter_group_name,
COALESCE(eu.european_union_country, ff.country_of_destination) AS country_of_destination,
ff.mass_tonnes,
ff.fob,
ff.importer_label,
ff.port_of_export_label,
ff.hs6
FROM final_fields ff
LEFT JOIN eu_countries eu
ON ff.country_of_destination = eu.eu_country
)
SELECT * FROM final_with_eu
{% else %}
-- If not, keep the normal country names
SELECT * FROM final_fields
{% endif %}