Colombia coffee 2017 2021 preprocessing qa
View or edit on GitHub
This page is synchronized from trase/data/colombia/trade/cd/coffee/Colombia_coffee_2017_2021_preprocessing_qa.ipynb. Last modified on 2025-12-13 00:30 CET by Trase Admin.
Please view or edit the original file there; changes should be reflected here after a midnight build (CET time),
or manually triggering it with a GitHub action (link).
"""
Pre-processing CD dataset for coffee Colombia 2017-2021:
1. Load data from SICEX
2. Clean dataset
3. Run QA with Comtrade data
Based on the following code: https://github.com/sei-international/TRASE/blob/master/trase/data/brazil/trade/bol/2020/BRAZIL_BOL_2020.py
"""
import sys
sys.path.append(r'C:\Users\Osvaldo Pereira\Documents\GitHub\TRASE')
import unicodedata
import numpy as np
import pandas as pd
from unidecode import unidecode
import psycopg2
import requests
import pandas.io.sql as sqlio
import warnings
warnings.filterwarnings('ignore')
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.utilities.helpers import clean_string
from tempfile import gettempdir
MISSING_VALUES = ["NAN", "NONE", "NA"]
#Path to save the clean version
out_path = r'C:\Users\Osvaldo Pereira\Documents\GitHub\TRASE\trase\data\colombia\trade\cd\coffee\colombia_coffee_cd_2017_2021_cleaned.csv'
def main():
df = get_pandas_df_once(
"colombia/trade/cd/export/coffee/in/Coffee_2017_to_2021.csv",
dtype=str,
sep=";",
keep_default_na=True)
df = exporter_id(df)
df = select_and_rename_columns(df)
df = get_hs(df)
df = parse_dates(df)
df = clean_string_columns(df)
df['importer.label'] = df['importer.label'].replace('A LA ORDEN', 'UNKNOWN')
df = unknown_volume(df)
df.to_csv(out_path, sep=";", index=False)
# display(df)
#Process exporter ID
def exporter_id(df):
df['exporter.id'] = df['Company ID Number'] + "-" + df['Company check digit ID']
df['coutry_of_origin'] = 'COLOMBIA'
return df
#Select relevant columns and rename for Trase standards
def select_and_rename_columns(df):
columns = {
"Date yyyy-mm-dd": "date",
'Harmonized Code Description English': 'Product Description by Schedule B Code',
'Product Schedule B Code': 'product_b_code',
# exporter
"City Departure": "exporter.city.label",
"exporter.id":"exporter.id",
"Company declarant": "exporter.label",
# ports, country
"Custom Loading": "port_of_export.label",
"Supplier City": "port_of_import.label",
"State": "exporter.state.label",
"coutry_of_origin":"coutry_of_origin",
# importer
"Supplier City": "importer.city",
"Country of Destiny": "importer.country.label",
"Importer": "importer.label",
"Import Declaration Number": "importer.code",
# volume
'TOTAL Quantity 1': 'volume.raw',
'TOTAL FOB Value (US$)': 'fob',
'TOTAL CIF Value (US$)': 'cif'
}
return df[columns].rename(columns=columns, errors="raise")
#Get HS codes
def get_hs(df):
df["hs4"] = df["product_b_code"].str.slice(0, 4)
df["hs6"] = df["product_b_code"].str.slice(0, 6)
df["hs8"] = df["product_b_code"].str.slice(0, 8).astype(str)
return df
#Get year, month and day
def parse_dates(df):
df['date'] = df['date'].str.replace('-','')
df = df.assign(year=df["date"].str.slice(0, 4).astype(str))
df = df.assign(month=df["date"].str.slice(4, 6).astype(str))
df = df.assign(day=df["date"].str.slice(6, 8).astype(str))
return df.drop(columns="date")
#Replace nan values by Unknown across the whole dataset
def clean_string_columns(df):
# clean the string columns
for column in df.columns:
df[column] = df[column].astype(str).apply(clean_string)
# replace null values to UNKNOWN
for column in df.columns:
df.loc[df[column].dropna().isin(MISSING_VALUES), column] = "UNKNOWN"
return df
#Drop columns with UNKNOWN values (across all dataset)
def unknown_volume(df):
df=df[df["volume.raw"].str.contains("UNKNOWN")==False]
return df
if __name__ == "__main__":
main()
import os
from tempfile import gettempdir
import pandas as pd
import numpy as np
import requests
from trase.tools import uses_database
from trase.tools.aws.aws_helpers import get_pandas_df
pd.options.mode.chained_assignment = None # default='warn'
#Get comtrade from API: split comtrade api lik to organize in pandas dataframe
base = "https://comtrade.un.org/api/get?"
maxrec = "1000"
item = "C"
freq = "A"
px="HS"
ps="2017%2C2018%2C2019%2C2020%2C2021" #Years (Colombia Coffee)
r="170"#Colombia
p="0"
rg="2"
cc="090111%2C090112%2C090121%2C090122%2C090190%2C210111%2C210112" #HS6 comtrade (Colombia Coffee)
fmt="json"
#Import cleaned df from S3
df = get_pandas_df(
"colombia/trade/cd/export/coffee/out/colombia_coffee_cd_2017_2021_cleaned.csv",
dtype=str,
sep=";")
cd = df[['year', 'hs6', 'volume.raw', 'fob']]
#Relevant columns from Comtrade data
comtrade_columns = pd.DataFrame(columns=['period', 'rtTitle', 'TradeQuantity', 'NetWeight', 'cmdCode', 'cmdDescE', 'TradeValue'])
#get comtrade to create df
def comtrade():
url = base + "max=" + maxrec + "&" "type=" + item + "&" + "freq=" + freq + "&" + "px=" +px + "&" + "ps=" + str(ps) + "&" + "r="+ str(r) + "&" + "p=" + p + "&" + "rg=" +rg + "&" + "cc=" + cc + "&" + "fmt=" + fmt
t = requests.get(url)
x = t.json()
new = pd.DataFrame(x["dataset"])
comtrade = comtrade_columns.append(new)#append as pandas dataframe
comtrade = comtrade.groupby(["period", "cmdCode", "cmdDescE"], as_index=False) [["NetWeight", "TradeValue"]].apply(sum)
comtrade['concat'] = comtrade['period'].astype(str) + comtrade['cmdCode']
return comtrade
#get Trase df
def cd_trase(cd):
cd = cd.replace({'volume.raw': {'UNKNOWN': 0}})
cd = cd.replace({'fob': {'UNKNOWN': 0}})
cd['volume.raw'] = cd['volume.raw'].astype(float)
cd['fob'] = cd['fob'].astype(float)
cd["hs6"] = cd["hs6"].astype(str)
cd = cd[cd["hs6"].isin(['090111', '090112', '090121', '090122', '090190', '210111', '210112'])]
cd = cd.groupby(["year", "hs6"], as_index=False) [["volume.raw", "fob"]].apply(sum)
cd['concat'] = cd['year'].astype(str) + cd['hs6']
return cd
#validation: ratio for quantity and fob
def validation():
merge = pd.merge(cd_trase(cd), comtrade())
merge = merge[['year', 'cmdDescE', 'cmdCode', 'volume.raw', 'fob' ,'NetWeight', 'TradeValue']]
merge = merge.rename(columns={'cmdDescE': 'Commodity',
'NetWeight':'COMTRADE KG',
'TradeValue':'COMTRADE FOB',
'volume.raw': 'Trase KG',
'fob': 'Trase FOB'})
merge["Ratio KG"] = merge['Trase KG'].divide(merge['COMTRADE KG'])#quantuty ratio
merge["Ratio FOB"] = merge['Trase FOB'].divide(merge['COMTRADE FOB'])#FOB ratio
return merge
#Show table
by_product = validation().sort_values(by=['Ratio KG'])
# display(by_product)