Bol Soy Beef Palm Oil 2015 2018
s3://trase-storage/china/trade/out/bol_soy_beef_palm_oil_2015_2018.csv
Dbt path: trase_production.main.bol_soy_beef_palm_oil_2015_2018
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/china/trade/out/_schema.yml
Model file link: trase/data_pipeline/models/china/trade/out/bol_soy_beef_palm_oil_2015_2018.py
Calls script: trase/data/china/trade/out/bol_soy_beef_palm_oil_2015_2018.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, china, out, trade
bol_soy_beef_palm_oil_2015_2018
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/china/trade/out/bol_soy_beef_palm_oil_2015_2018.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.3hs code impsource.trase_duckdb.trase-storage-raw.2hs_ccd42 201501-201612 importsource.trase_duckdb.trase-storage-raw.6hs codesource.trase_duckdb.trase-storage-raw.02023000-201501-201812imp ccdsource.trase_duckdb.trase-storage-raw.2hs ccd42 exp 2015
Sources
['trase-storage-raw', '3hs code imp']['trase-storage-raw', '2hs_ccd42 201501-201612 import']['trase-storage-raw', '6hs code']['trase-storage-raw', '02023000-201501-201812imp ccd']['trase-storage-raw', '2hs ccd42 exp 2015']
import unicodedata
import pandas as pd
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import rename
COLUMNS_TO_KEEP = [
"hscode",
"year",
"quantity",
"value_usd",
"trader_code",
"chinese_trader_name_cn",
"chinese_trader_name",
"plant_location",
"company_location",
"customs",
"original_destination_country",
"routing_country",
"payment_terms",
"payment_temrs_note",
"price_unit_usd",
"price_unit_cny",
"quantity_unit",
"uscc",
]
def load_data():
"""
Load files:
2hs_CCD42 201501-201612 import.xls
2hs CCD42 exp 2015.xlsx
02023000-201501-201812imp CCD.xlsx
6hs code.xls
3hs code imp.xls (having special column names)
"""
df1 = get_pandas_df_once(
"china/trade/originals/2hs_CCD42 201501-201612 import.xls",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
xlsx=True,
)
df2 = get_pandas_df_once(
"china/trade/originals/2hs CCD42 exp 2015.xlsx",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
xlsx=True,
)
df3 = get_pandas_df_once(
"china/trade/originals/02023000-201501-201812imp CCD.xlsx",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
xlsx=True,
)
df4 = get_pandas_df_once(
"china/trade/originals/6hs code.xls",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
xlsx=True,
)
df5 = get_pandas_df_once(
"china/trade/originals/3hs code imp.xls",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
xlsx=True,
)
return [df1, df2, df3, df4, df5]
def rename_columns(df):
columns = {
"HSCode": "hscode",
"Year": "year",
"TraderCode": "trader_code",
"ChineseTraderNameCN": "chinese_trader_name_cn",
"ChineseTraderName": "chinese_trader_name",
"PlantLocation": "plant_location",
"CompanyLocation": "company_location",
"Customs": "customs",
"Quantity": "quantity",
"ValueUSD": "value_usd",
"OriginOrDestinationCountry": "original_destination_country",
"RoutingCountry": "routing_country",
"PaymentTerms": "payment_terms",
"PaymentTermsNote": "payment_temrs_note",
"PriceUnitUSD": "price_unit_usd",
"PriceUnitCNY": "price_unit_cny",
"QuantityUnit": "quantity_unit",
"USCC": "uscc",
}
df = rename(df, columns)
df["hscode"] = df["hscode"].str[:6]
df["year"] = df["year"].str[:4]
return df[COLUMNS_TO_KEEP]
def rename_columns_special(df):
columns = {
"HS_Code": "hscode",
"Year": "year",
"Trader_Code": "trader_code",
"Chinese_TraderName_CN": "chinese_trader_name_cn",
"Chinese_Trader_Name": "chinese_trader_name",
"Plant_Location": "plant_location",
"Company_Location": "company_location",
"Customs": "customs",
"Quantity": "quantity",
"Total_Value_USD": "value_usd",
"Trading_Country": "original_destination_country",
"Routing_Country": "routing_country",
"Price_Unit_USD": "price_unit_usd",
"Price_Unit_CNY": "price_unit_cny",
"Unit_Qty": "quantity_unit",
"Incoterms": "payment_terms",
"Incoterms_Note": "payment_temrs_note",
"USCC": "uscc",
}
df = rename(df, columns=columns)
df["hscode"] = df["hscode"].str[:6]
df["year"] = df["year"].str[:4]
return df[COLUMNS_TO_KEEP]
def country_abbreviation_seperation(df):
"""
seperate countries and their abbreviation
"""
df[
["original_destination_country", "original_destination_country_abbreviation"]
] = df["original_destination_country"].str.split("(", expand=True)
df[["routing_country", "routing_country_abbreviation"]] = df[
"routing_country"
].str.split("(", expand=True)
df["original_destination_country_abbreviation"] = df[
"original_destination_country_abbreviation"
].str[:-1]
df["routing_country_abbreviation"] = df["routing_country_abbreviation"].str[:-1]
return df
def clean_string(text, upper=True):
"""
Take a string and clean it!
- Remove double-whitespace
- Remove tab, newline, return, formfeed, etc.
- Replace accented characters (e.g. ö becomes o)
- Trim leading and trailing whitespace
- Convert to upper-case
"""
def keep(character):
category = unicodedata.category(character)
return (
category[0] != "C" # ignore control characters
and category != "Zl" # ignore line separator
and category != "Zp" # ignore paragraph separator
)
text = "".join(c for c in text if keep(c))
text = " ".join(text.split())
# text = unidecode(text)
return text.upper() if upper else text
def clean_string_columns(df, column_list):
"""
Clean the string columns by replacing the missing values and adjusting formats
:param df: dataframe
:param column_list: list of column names, list(str)
:return: df_2: cleaned dataframe
"""
missing_value_list = ["NAN", "NONE", "NA", "NULL", ""]
# clean the string columns
for column in column_list:
df[column] = df[column].fillna("UNKNOWN")
df.loc[df[column].isin(missing_value_list), column] = "UNKNOWN"
df[column] = df[column].apply(clean_string)
return df
def clean_numerical_columns(df):
"""
Clean the volume column by converting the datatype and drop non-numeric data
:param df: dataframe
:return: df_2: cleaned dataframe
"""
condition = df["quantity"].str.replace(".", "", 1).str.isdigit()
# print("Under Quantity, out of {len(df_2)} rows, there are {condition.sum()} rows convertable to numbers." )
df = df.loc[condition].copy()
df["quantity"] = df["quantity"].astype(float)
condition = df["value_usd"].str.replace(".", "", 1).str.isdigit()
# print("Under value_usd, out of {len(df_2)} rows, there are {condition.sum()} rows convertable to numbers." )
df = df.loc[condition].copy()
df["value_usd"] = df["value_usd"].astype(float)
return df
def consistency_check(df, checked_columns_list):
"""
check whether the values in a column(like payment method, price unit) are identical
"""
for column in checked_columns_list:
duplicate_check = df[column].duplicated(keep=False)
if not duplicate_check.all():
index_list = list(duplicate_check[[not i for i in duplicate_check]].index)
print(f"Column {column} has inconsistent rows, with index: {index_list}.")
else:
print(f"Values in column {column} are identical.")
def seperate_province_city(df):
location_columns = ["plant_location", "company_location"]
for column_name in location_columns:
df = pd.concat(
[
df,
pd.DataFrame(
columns=[
f"{column_name}_province",
f"{column_name}_city",
f"{column_name}_district",
f"{column_name}_other_areas",
],
dtype=object,
),
],
sort=True,
)
df = df.apply(lambda row: seperate_province_city_row(row, column_name), axis=1)
return df
def seperate_province_city_row(row, column_name):
values = ["", "", "", ""]
splited = row[column_name].split(", ")
if len(splited) == 1:
values[3] = splited[0]
else:
if splited[0] in ["Beijing", "Shanghai", "Tianjin", "Chongqing"]:
values[1] = splited[0]
values[2] = splited[1]
else:
values[0] = splited[0]
values[1] = splited[1]
row[
[
f"{column_name}_province",
f"{column_name}_city",
f"{column_name}_district",
f"{column_name}_other_areas",
]
] = values
return row
def clean_method(df):
df = country_abbreviation_seperation(df)
df = seperate_province_city(df)
df = clean_numerical_columns(df)
string_columns_list = {
"hscode",
"year",
"trader_code",
"chinese_trader_name_cn",
"chinese_trader_name",
"plant_location",
"company_location",
"customs",
"original_destination_country",
"original_destination_country_abbreviation",
"routing_country",
"routing_country_abbreviation",
"payment_terms",
"payment_temrs_note",
"price_unit_usd",
"price_unit_cny",
"quantity_unit",
"plant_location_province",
"plant_location_city",
"plant_location_district",
"plant_location_other_areas",
"company_location_province",
"company_location_city",
"company_location_district",
"company_location_other_areas",
"uscc",
}
df = clean_string_columns(df, string_columns_list)
checked_columns_list = [
"payment_terms",
"payment_temrs_note",
"price_unit_usd",
"price_unit_cny",
"quantity_unit",
]
consistency_check(df, checked_columns_list)
return df
def main():
df_list = load_data()
for i in range(len(df_list) - 1):
df = df_list[i]
df["USCC"] = ""
df = rename_columns(df)
df_list[i] = df
df_list[4] = rename_columns_special(df_list[4])
df = pd.concat(df_list)
df = df.drop_duplicates().reset_index()
df = clean_method(df)
write_csv_for_upload(df, "china/trade/out/bol_soy_beef_palm_oil_2015_2018.csv")
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "3hs code imp")
dbt.source("trase-storage-raw", "2hs_ccd42 201501-201612 import")
dbt.source("trase-storage-raw", "6hs code")
dbt.source("trase-storage-raw", "02023000-201501-201812imp ccd")
dbt.source("trase-storage-raw", "2hs ccd42 exp 2015")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})