Skip to content

Silver Cote Divoire Coffee 2020

s3://trase-storage/cote_divoire/coffee/trade/bol/silver/silver_cotedivoire_coffee-2020.parquet

Dbt path: trase_production.main_cote_divoire_coffee.silver_cote_divoire_coffee_2020

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/cote_divoire/coffee/trade/_schema_cote_divoire_coffee.yml

Model file link: trase/data_pipeline/models/cote_divoire/coffee/trade/silver_cote_divoire_coffee_2020.py

Calls script: trase/data/cote_divoire/trade/cd/export/silver_cotedivoire_coffee-2020.R

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: silver, cote_divoire, coffee, trade, 2020, diet-trase-coffee


silver_cote_divoire_coffee_2020

Description

....


Details

Column Type Description
DATE TIMESTAMP WITH TIME ZONE
DIRECTION VARCHAR
EXPORTER_CODE VARCHAR
EXPORTER VARCHAR
EXPORTER_ADDRESS VARCHAR
BUYER VARCHAR
BUYER_ADDRESS VARCHAR
PROVENANCE_COUNTRY VARCHAR
COUNTRY_DESTINATION VARCHAR
ORIGIN_COUNTRY VARCHAR
HS_CODE_DESCRIPTION VARCHAR
UNIT_CODE VARCHAR
UNIT VARCHAR
NUMBER_OF_PRODUCTS DOUBLE
QUANTITY DOUBLE
NET_WEIGHT_KG DOUBLE
GROSS_WEIGHT_KG DOUBLE
CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC DOUBLE
CIF_VALUE_IN_USD DOUBLE
PORT_CODE VARCHAR
PORT_NAME VARCHAR
TRANSPORT_MODE_CODE VARCHAR
TRANSPORT_MODE VARCHAR
MONTH VARCHAR
YEAR VARCHAR
Chapter VARCHAR
Heading VARCHAR
HS_CODE VARCHAR

Models / Seeds

  • source.trase_duckdb.source_cote_divoire_trade.original_cote_divoire_comtrade_2020
  • source.trase_duckdb.source_cote_divoire_trade.original_cote_divoire_bol_2020

Sources

  • ['source_cote_divoire_trade', 'original_cote_divoire_comtrade_2020']
  • ['source_cote_divoire_trade', 'original_cote_divoire_bol_2020']
# this script pre-processes the Ethiopian coffee export data from 2020

library(tidyverse)
library(arrow)

aws.signature::use_credentials()
library(aws.s3)

# set upload location
upload_to_s3 <- FALSE # if "FALSE" then data saved to tmp folder
# if "TRUE" then data uploaded to s3

bucket_name <- "trase-storage"
input_export_path <- "cote_divoire/coffee/trade/bol/IVORY_COAST_EXPORT_090111_210111_JAN20_DEC20.xlsx"
input_comtrade_path <- "cote_divoire/coffee/trade/bol/CotedIvoire_coffee_Comtrade_TradeData_10_8_2024_8_41_43.csv"

output_file_name1 <- "silver_cotedivoire_coffee-2020.parquet"
output_file_name2 <- "silver_qa_cotedivoire_coffee_comtrade-2020.parquet"


# functions ---------------------------------------------------------------

# clean characters
clean_chars <-  function(x) {
  x %>%
    stringi::stri_trans_general("Latin-ASCII") %>% # remove accents
    str_to_upper()  # upper case

}

# get data ----------------------------------------------------------------

# vendor data
data <- s3read_using(
  readxl::read_xlsx, 
  object = input_export_path,
  bucket = bucket_name,
  n_max = Inf, 
  na = "",
  col_types = c('date',     # DATE 
                'text',     # DIRECTION
                'text',     # EXPORTER_CODE
                'text',     # EXPORTER
                'text',     # EXPORTER_ADDRESS
                'text',     # BUYER
                'text',     # BUYER_ADDRESS
                'text',     # PROVENANCE_COUNTRY
                'text',     # DESTINATION_COUNTRY
                'text',     # ORIGIN_COUNTRY
                'text',     # HS_CODE
                'text',     # HS_CODE_DESCRIPTION - no data
                'text',     # UNIT_CODE
                'text',     # UNIT
                'numeric',  # NUMBER_OF_PRODUCTS
                'numeric',  # QUANTITY
                'numeric',  # NET_WEIGHT_KG
                'numeric',  # GROSS_WEIGHT_KG
                'numeric',  # CIF_VALUE_IN_WEST_AFRICAN_CFA_FRANC
                'numeric',  # CIF_VALUE_IN_USD
                'text',     # PORT_CODE
                'text',     # PORT_NAME
                'text',     # TRANSPORT_MODE_CODE
                'text',     # TRANSPORT_MODE
                'text',     # MONTH
                'text',     # YEAR
                'text',     # Chapter
                'text',     # Heading
                'text')      # subheading
) %>% 
  mutate(DESTINATION_COUNTRY = toupper(DESTINATION_COUNTRY))


# get comtrade data (disaggregated) from s3 (pre-downloaded)
comtrade <- s3read_using(
  read_delim, 
  object = input_comtrade_path,
  bucket = bucket_name,
  n_max = Inf, 
  na = "",
  delim = ",", 
  locale = locale(encoding = "Latin1")) %>% 
  filter(partnerDesc != "World") %>% 
  select(cmdCode, partnerDesc, netWgt, fobvalue) %>% 
  rename(HS_CODE = cmdCode, 
         COUNTRY_DESTINATION = partnerDesc,
         comtrade_kg = netWgt, 
         comtrade_usd = fobvalue) %>% 
  mutate_if(is.character, clean_chars) %>%           # remove accents and make upper case

  #change country names to match trade data
  mutate(COUNTRY_DESTINATION = case_when(
    COUNTRY_DESTINATION == "ALGERIA" ~ "ALGERIE",
    COUNTRY_DESTINATION == "GERMANY" ~ "ALLEMAGNE",
    COUNTRY_DESTINATION == "BELGIUM" ~ "BELGIQUE", 
    COUNTRY_DESTINATION == "EGYPT" ~ "EGYPTE", 
    COUNTRY_DESTINATION == "UNITED ARAB EMIRATES" ~ "EMIRATS ARABES UNIS", 
    COUNTRY_DESTINATION == "ECUADOR" ~ "EQUATEUR", 
    COUNTRY_DESTINATION == "SPAIN" ~ "ESPAGNE",
    COUNTRY_DESTINATION == "GREECE" ~ "GRECE",
    COUNTRY_DESTINATION == "INDIA" ~ "INDE",
    COUNTRY_DESTINATION == "ITALY" ~ "ITALIE",
    COUNTRY_DESTINATION == "MOROCCO" ~ "MAROC",
    COUNTRY_DESTINATION == "MAURITIUS" ~ "MAURICE, ILE",
    COUNTRY_DESTINATION == "NETHERLANDS" ~ "PAYS-BAS",
    COUNTRY_DESTINATION == "UNITED KINGDOM" ~ "ROYAUME-UNI",
    COUNTRY_DESTINATION == "SLOVENIA" ~ "SLOVENIE",
    COUNTRY_DESTINATION == "TUNISIA" ~ "TUNISIE",
    COUNTRY_DESTINATION == "VIET NAM" ~ "VIETNAM",
    COUNTRY_DESTINATION == "SOUTH AFRICA" ~ "AFRIQUE DU SUD",
    COUNTRY_DESTINATION == "CAMEROON" ~ "CAMEROUN",
    COUNTRY_DESTINATION == "GUINEA" ~ "GUINEE",
    COUNTRY_DESTINATION == "EQUATORIAL GUINEA" ~ "GUINEE EQUATORIALE",
    COUNTRY_DESTINATION == "GUINEA-BISSAU" ~ "GUINEE-BISSAU",
    COUNTRY_DESTINATION == "MAURITANIA" ~ "MAURITANIE",
    COUNTRY_DESTINATION == "GAMBIA" ~ "GAMBIE",
    TRUE ~ COUNTRY_DESTINATION))


# basic pre-processing ----------------------------------------------------

# 1. rename HS codes with leading 0s
# 2. other countries appear as "origin": Senegal, Royaume-Uni, Espagne, France (8 flows in total)
#    it is unclear what these flows really represent as they seem to come from neighbouring countries 
#    we decide to remove

data_process <- data %>% 
  select(-HS_CODE) %>%                                   # remove longer HS code
  rename(HS_CODE = Sub_Heading,
         COUNTRY_DESTINATION = DESTINATION_COUNTRY) %>% 
  mutate(HS_CODE = ifelse(HS_CODE == "90111", "090111", HS_CODE)) %>% 
  filter(ORIGIN_COUNTRY == "Cote d'Ivoire")

# check exchange rate and cost per hs code --------------------------------

data_usd_per_kg <- data_process %>% 
  filter(CIF_VALUE_IN_USD != 0) %>%    # Remove records where CIF_VALUE_IN_USD is 0
  mutate(usd_per_kg = CIF_VALUE_IN_USD / NET_WEIGHT_KG)

## to correct the fob
usd_per_kg_summary <- data_usd_per_kg %>% 
  summarise(mean_usd_per_kg = mean(usd_per_kg, na.rm = TRUE),
            median_usd_per_kg = median(usd_per_kg, na.rm = TRUE))

# Use the calculated mean usd_per_kg to fill in records where CIF_VALUE_IN_USD is 0
mean_usd_per_kg <- usd_per_kg_summary$mean_usd_per_kg
data_process <- data_process %>% 
  mutate(CIF_VALUE_IN_USD = ifelse(CIF_VALUE_IN_USD == 0, NET_WEIGHT_KG * mean_usd_per_kg, CIF_VALUE_IN_USD))

# check data against Comtrade ---------------------------------------------

## first look at total volumes and usd per hs code:

# 2020 data from comtrade
# 090111, 66,290,048 kg and 90,020,840 USD
# 210111,  6,345,846 kg and 67,277,112 USD
data_tot <- data_process %>% 
  group_by(HS_CODE) %>% 
  summarise(tot_kg = sum(NET_WEIGHT_KG, na.rm = T),
            tot_usd = sum(CIF_VALUE_IN_USD)) %>% 
  ungroup() %>% 
  add_column(comtrade_kg = c(66290048, 6345846),
             comtrade_usd = c(90020840, 67277112)) %>% 
  mutate(ratio_kg = tot_kg/comtrade_kg, 
         ratio_usd = tot_usd/comtrade_usd)

print(data_tot)

## then look at individual countries:

data_tot_country <- data_process %>% 
  group_by(HS_CODE, COUNTRY_DESTINATION) %>% 
  summarise(tot_kg = sum(NET_WEIGHT_KG, na.rm = T),
            tot_usd = sum(CIF_VALUE_IN_USD, na.rm = T)) %>% 
  ungroup() %>% 
  full_join(comtrade, by = c("HS_CODE", "COUNTRY_DESTINATION")) %>% 
  mutate(ratio_kg = tot_kg/comtrade_kg, 
         ratio_usd = tot_usd/comtrade_usd)

print(data_tot_country)

# export data -------------------------------------------------------------

#tmp <- fs::dir_create(fs::file_temp()) # Save to /tmp to make it easier to find
tmp_file <- fs::path("/tmp", output_file_name1)
tmp_file2 <- fs::path("/tmp", output_file_name2)

# Save as Parquet
arrow::write_parquet(data_process, tmp_file)
arrow::write_parquet(data_tot, tmp_file2)

cat("saved at", tmp_file, "\n")
cat("saved at", tmp_file2, "\n")

if (upload_to_s3 == "TRUE") {

  aws.s3::put_object(
    file = tmp_file,
    obj = output_full_path1,
    bucket = bucket_name
  )

  aws.s3::put_object(
    file = tmp_file2,
    obj = output_full_path2,
    bucket = bucket_name
  )

}

### END ###
# silver_cote_divoire_coffee_2020
import pyarrow.parquet as pq

from trase.tools.r.utilities import run_r_script_in_repository


def model(dbt, session):
    dbt.config(materialized="external")

    # Declaring the sources so they appear in the documentation / lineage
    session.execute("INSTALL spatial; LOAD spatial;")  # Needed to reference excel files
    comtrade_data = dbt.source(
        "source_cote_divoire_trade", "original_cote_divoire_comtrade_2020"
    )
    trade_data = dbt.source(
        "source_cote_divoire_trade", "original_cote_divoire_bol_2020"
    )

    # Execute the script using the run_r_script_in_repository  wrapper
    script_path = (
        "trase/data/cote_divoire/trade/cd/export/silver_cotedivoire_coffee-2020.R"
    )
    output_file_path = "/tmp/silver_cotedivoire_coffee-2020.parquet"
    print("Executing R script:", script_path)
    run_r_script_in_repository(script_path, output_files=[output_file_path])

    export_data = pq.read_table(output_file_path)

    return export_data