Silver Cote Divoire Coffee 2020
s3://trase-storage/cote_divoire/coffee/trade/bol/silver/silver_cotedivoire_coffee-2020.parquet
Dbt path: trase_production.main_cote_divoire_coffee.silver_cote_divoire_coffee_2020
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/cote_divoire/coffee/trade/_schema_cote_divoire_coffee.yml
Model file link: trase/data_pipeline/models/cote_divoire/coffee/trade/silver_cote_divoire_coffee_2020.py
Calls script: trase/data/cote_divoire/trade/cd/export/silver_cotedivoire_coffee-2020.R
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: silver, cote_divoire, coffee, trade, 2020, diet-trase-coffee
silver_cote_divoire_coffee_2020
Description
....
Details
| Column | Type | Description |
|---|---|---|
DATE |
TIMESTAMP WITH TIME ZONE |
|
DIRECTION |
VARCHAR |
|
EXPORTER_CODE |
VARCHAR |
|
EXPORTER |
VARCHAR |
|
EXPORTER_ADDRESS |
VARCHAR |
|
BUYER |
VARCHAR |
|
BUYER_ADDRESS |
VARCHAR |
|
PROVENANCE_COUNTRY |
VARCHAR |
|
COUNTRY_DESTINATION |
VARCHAR |
|
ORIGIN_COUNTRY |
VARCHAR |
|
HS_CODE_DESCRIPTION |
VARCHAR |
|
UNIT_CODE |
VARCHAR |
|
UNIT |
VARCHAR |
|
NUMBER_OF_PRODUCTS |
DOUBLE |
|
QUANTITY |
DOUBLE |
|
NET_WEIGHT_KG |
DOUBLE |
|
GROSS_WEIGHT_KG |
DOUBLE |
|
CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC |
DOUBLE |
|
CIF_VALUE_IN_USD |
DOUBLE |
|
PORT_CODE |
VARCHAR |
|
PORT_NAME |
VARCHAR |
|
TRANSPORT_MODE_CODE |
VARCHAR |
|
TRANSPORT_MODE |
VARCHAR |
|
MONTH |
VARCHAR |
|
YEAR |
VARCHAR |
|
Chapter |
VARCHAR |
|
Heading |
VARCHAR |
|
HS_CODE |
VARCHAR |
Models / Seeds
source.trase_duckdb.source_cote_divoire_trade.original_cote_divoire_comtrade_2020source.trase_duckdb.source_cote_divoire_trade.original_cote_divoire_bol_2020
Sources
['source_cote_divoire_trade', 'original_cote_divoire_comtrade_2020']['source_cote_divoire_trade', 'original_cote_divoire_bol_2020']
# this script pre-processes the Ethiopian coffee export data from 2020
library(tidyverse)
library(arrow)
aws.signature::use_credentials()
library(aws.s3)
# set upload location
upload_to_s3 <- FALSE # if "FALSE" then data saved to tmp folder
# if "TRUE" then data uploaded to s3
bucket_name <- "trase-storage"
input_export_path <- "cote_divoire/coffee/trade/bol/IVORY_COAST_EXPORT_090111_210111_JAN20_DEC20.xlsx"
input_comtrade_path <- "cote_divoire/coffee/trade/bol/CotedIvoire_coffee_Comtrade_TradeData_10_8_2024_8_41_43.csv"
output_file_name1 <- "silver_cotedivoire_coffee-2020.parquet"
output_file_name2 <- "silver_qa_cotedivoire_coffee_comtrade-2020.parquet"
# functions ---------------------------------------------------------------
# clean characters
clean_chars <- function(x) {
x %>%
stringi::stri_trans_general("Latin-ASCII") %>% # remove accents
str_to_upper() # upper case
}
# get data ----------------------------------------------------------------
# vendor data
data <- s3read_using(
readxl::read_xlsx,
object = input_export_path,
bucket = bucket_name,
n_max = Inf,
na = "",
col_types = c('date', # DATE
'text', # DIRECTION
'text', # EXPORTER_CODE
'text', # EXPORTER
'text', # EXPORTER_ADDRESS
'text', # BUYER
'text', # BUYER_ADDRESS
'text', # PROVENANCE_COUNTRY
'text', # DESTINATION_COUNTRY
'text', # ORIGIN_COUNTRY
'text', # HS_CODE
'text', # HS_CODE_DESCRIPTION - no data
'text', # UNIT_CODE
'text', # UNIT
'numeric', # NUMBER_OF_PRODUCTS
'numeric', # QUANTITY
'numeric', # NET_WEIGHT_KG
'numeric', # GROSS_WEIGHT_KG
'numeric', # CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC
'numeric', # CIF_VALUE_IN_USD
'text', # PORT_CODE
'text', # PORT_NAME
'text', # TRANSPORT_MODE_CODE
'text', # TRANSPORT_MODE
'text', # MONTH
'text', # YEAR
'text', # Chapter
'text', # Heading
'text') # subheading
) %>%
mutate(DESTINATION_COUNTRY = toupper(DESTINATION_COUNTRY))
# get comtrade data (disaggregated) from s3 (pre-downloaded)
comtrade <- s3read_using(
read_delim,
object = input_comtrade_path,
bucket = bucket_name,
n_max = Inf,
na = "",
delim = ",",
locale = locale(encoding = "Latin1")) %>%
filter(partnerDesc != "World") %>%
select(cmdCode, partnerDesc, netWgt, fobvalue) %>%
rename(HS_CODE = cmdCode,
COUNTRY_DESTINATION = partnerDesc,
comtrade_kg = netWgt,
comtrade_usd = fobvalue) %>%
mutate_if(is.character, clean_chars) %>% # remove accents and make upper case
#change country names to match trade data
mutate(COUNTRY_DESTINATION = case_when(
COUNTRY_DESTINATION == "ALGERIA" ~ "ALGERIE",
COUNTRY_DESTINATION == "GERMANY" ~ "ALLEMAGNE",
COUNTRY_DESTINATION == "BELGIUM" ~ "BELGIQUE",
COUNTRY_DESTINATION == "EGYPT" ~ "EGYPTE",
COUNTRY_DESTINATION == "UNITED ARAB EMIRATES" ~ "EMIRATS ARABES UNIS",
COUNTRY_DESTINATION == "ECUADOR" ~ "EQUATEUR",
COUNTRY_DESTINATION == "SPAIN" ~ "ESPAGNE",
COUNTRY_DESTINATION == "GREECE" ~ "GRECE",
COUNTRY_DESTINATION == "INDIA" ~ "INDE",
COUNTRY_DESTINATION == "ITALY" ~ "ITALIE",
COUNTRY_DESTINATION == "MOROCCO" ~ "MAROC",
COUNTRY_DESTINATION == "MAURITIUS" ~ "MAURICE, ILE",
COUNTRY_DESTINATION == "NETHERLANDS" ~ "PAYS-BAS",
COUNTRY_DESTINATION == "UNITED KINGDOM" ~ "ROYAUME-UNI",
COUNTRY_DESTINATION == "SLOVENIA" ~ "SLOVENIE",
COUNTRY_DESTINATION == "TUNISIA" ~ "TUNISIE",
COUNTRY_DESTINATION == "VIET NAM" ~ "VIETNAM",
COUNTRY_DESTINATION == "SOUTH AFRICA" ~ "AFRIQUE DU SUD",
COUNTRY_DESTINATION == "CAMEROON" ~ "CAMEROUN",
COUNTRY_DESTINATION == "GUINEA" ~ "GUINEE",
COUNTRY_DESTINATION == "EQUATORIAL GUINEA" ~ "GUINEE EQUATORIALE",
COUNTRY_DESTINATION == "GUINEA-BISSAU" ~ "GUINEE-BISSAU",
COUNTRY_DESTINATION == "MAURITANIA" ~ "MAURITANIE",
COUNTRY_DESTINATION == "GAMBIA" ~ "GAMBIE",
TRUE ~ COUNTRY_DESTINATION))
# basic pre-processing ----------------------------------------------------
# 1. rename HS codes with leading 0s
# 2. other countries appear as "origin": Senegal, Royaume-Uni, Espagne, France (8 flows in total)
# it is unclear what these flows really represent as they seem to come from neighbouring countries
# we decide to remove
data_process <- data %>%
select(-HS_CODE) %>% # remove longer HS code
rename(HS_CODE = Sub_Heading,
COUNTRY_DESTINATION = DESTINATION_COUNTRY) %>%
mutate(HS_CODE = ifelse(HS_CODE == "90111", "090111", HS_CODE)) %>%
filter(ORIGIN_COUNTRY == "Cote d'Ivoire")
# check exchange rate and cost per hs code --------------------------------
data_usd_per_kg <- data_process %>%
filter(CIF_VALUE_IN_USD != 0) %>% # Remove records where CIF_VALUE_IN_USD is 0
mutate(usd_per_kg = CIF_VALUE_IN_USD / NET_WEIGHT_KG)
## to correct the fob
usd_per_kg_summary <- data_usd_per_kg %>%
summarise(mean_usd_per_kg = mean(usd_per_kg, na.rm = TRUE),
median_usd_per_kg = median(usd_per_kg, na.rm = TRUE))
# Use the calculated mean usd_per_kg to fill in records where CIF_VALUE_IN_USD is 0
mean_usd_per_kg <- usd_per_kg_summary$mean_usd_per_kg
data_process <- data_process %>%
mutate(CIF_VALUE_IN_USD = ifelse(CIF_VALUE_IN_USD == 0, NET_WEIGHT_KG * mean_usd_per_kg, CIF_VALUE_IN_USD))
# check data against Comtrade ---------------------------------------------
## first look at total volumes and usd per hs code:
# 2020 data from comtrade
# 090111, 66,290,048 kg and 90,020,840 USD
# 210111, 6,345,846 kg and 67,277,112 USD
data_tot <- data_process %>%
group_by(HS_CODE) %>%
summarise(tot_kg = sum(NET_WEIGHT_KG, na.rm = T),
tot_usd = sum(CIF_VALUE_IN_USD)) %>%
ungroup() %>%
add_column(comtrade_kg = c(66290048, 6345846),
comtrade_usd = c(90020840, 67277112)) %>%
mutate(ratio_kg = tot_kg/comtrade_kg,
ratio_usd = tot_usd/comtrade_usd)
print(data_tot)
## then look at individual countries:
data_tot_country <- data_process %>%
group_by(HS_CODE, COUNTRY_DESTINATION) %>%
summarise(tot_kg = sum(NET_WEIGHT_KG, na.rm = T),
tot_usd = sum(CIF_VALUE_IN_USD, na.rm = T)) %>%
ungroup() %>%
full_join(comtrade, by = c("HS_CODE", "COUNTRY_DESTINATION")) %>%
mutate(ratio_kg = tot_kg/comtrade_kg,
ratio_usd = tot_usd/comtrade_usd)
print(data_tot_country)
# export data -------------------------------------------------------------
#tmp <- fs::dir_create(fs::file_temp()) # Save to /tmp to make it easier to find
tmp_file <- fs::path("/tmp", output_file_name1)
tmp_file2 <- fs::path("/tmp", output_file_name2)
# Save as Parquet
arrow::write_parquet(data_process, tmp_file)
arrow::write_parquet(data_tot, tmp_file2)
cat("saved at", tmp_file, "\n")
cat("saved at", tmp_file2, "\n")
if (upload_to_s3 == "TRUE") {
aws.s3::put_object(
file = tmp_file,
obj = output_full_path1,
bucket = bucket_name
)
aws.s3::put_object(
file = tmp_file2,
obj = output_full_path2,
bucket = bucket_name
)
}
### END ###
# silver_cote_divoire_coffee_2020
import pyarrow.parquet as pq
from trase.tools.r.utilities import run_r_script_in_repository
def model(dbt, session):
dbt.config(materialized="external")
# Declaring the sources so they appear in the documentation / lineage
session.execute("INSTALL spatial; LOAD spatial;") # Needed to reference excel files
comtrade_data = dbt.source(
"source_cote_divoire_trade", "original_cote_divoire_comtrade_2020"
)
trade_data = dbt.source(
"source_cote_divoire_trade", "original_cote_divoire_bol_2020"
)
# Execute the script using the run_r_script_in_repository wrapper
script_path = (
"trase/data/cote_divoire/trade/cd/export/silver_cotedivoire_coffee-2020.R"
)
output_file_path = "/tmp/silver_cotedivoire_coffee-2020.parquet"
print("Executing R script:", script_path)
run_r_script_in_repository(script_path, output_files=[output_file_path])
export_data = pq.read_table(output_file_path)
return export_data