Skip to content

Bol Soy Beef Palm Oil 2015 2018

s3://trase-storage/china/trade/out/bol_soy_beef_palm_oil_2015_2018.csv

Dbt path: trase_production.main.bol_soy_beef_palm_oil_2015_2018

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/china/trade/out/_schema.yml

Model file link: trase/data_pipeline/models/china/trade/out/bol_soy_beef_palm_oil_2015_2018.py

Calls script: trase/data/china/trade/out/bol_soy_beef_palm_oil_2015_2018.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, china, out, trade


bol_soy_beef_palm_oil_2015_2018

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/china/trade/out/bol_soy_beef_palm_oil_2015_2018.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.3hs code imp
  • source.trase_duckdb.trase-storage-raw.2hs_ccd42 201501-201612 import
  • source.trase_duckdb.trase-storage-raw.6hs code
  • source.trase_duckdb.trase-storage-raw.02023000-201501-201812imp ccd
  • source.trase_duckdb.trase-storage-raw.2hs ccd42 exp 2015

Sources

  • ['trase-storage-raw', '3hs code imp']
  • ['trase-storage-raw', '2hs_ccd42 201501-201612 import']
  • ['trase-storage-raw', '6hs code']
  • ['trase-storage-raw', '02023000-201501-201812imp ccd']
  • ['trase-storage-raw', '2hs ccd42 exp 2015']
import unicodedata

import pandas as pd

from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.sei_pcs.pandas_utilities import rename

COLUMNS_TO_KEEP = [
    "hscode",
    "year",
    "quantity",
    "value_usd",
    "trader_code",
    "chinese_trader_name_cn",
    "chinese_trader_name",
    "plant_location",
    "company_location",
    "customs",
    "original_destination_country",
    "routing_country",
    "payment_terms",
    "payment_temrs_note",
    "price_unit_usd",
    "price_unit_cny",
    "quantity_unit",
    "uscc",
]


def load_data():
    """
    Load files:
    2hs_CCD42 201501-201612 import.xls
    2hs CCD42 exp 2015.xlsx
    02023000-201501-201812imp CCD.xlsx
    6hs code.xls
    3hs code imp.xls (having special column names)
    """
    df1 = get_pandas_df_once(
        "china/trade/originals/2hs_CCD42 201501-201612 import.xls",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
        xlsx=True,
    )
    df2 = get_pandas_df_once(
        "china/trade/originals/2hs CCD42 exp 2015.xlsx",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
        xlsx=True,
    )
    df3 = get_pandas_df_once(
        "china/trade/originals/02023000-201501-201812imp CCD.xlsx",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
        xlsx=True,
    )
    df4 = get_pandas_df_once(
        "china/trade/originals/6hs code.xls",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
        xlsx=True,
    )
    df5 = get_pandas_df_once(
        "china/trade/originals/3hs code imp.xls",
        encoding="utf8",
        sep=";",
        dtype=str,
        keep_default_na=False,
        xlsx=True,
    )

    return [df1, df2, df3, df4, df5]


def rename_columns(df):
    columns = {
        "HSCode": "hscode",
        "Year": "year",
        "TraderCode": "trader_code",
        "ChineseTraderNameCN": "chinese_trader_name_cn",
        "ChineseTraderName": "chinese_trader_name",
        "PlantLocation": "plant_location",
        "CompanyLocation": "company_location",
        "Customs": "customs",
        "Quantity": "quantity",
        "ValueUSD": "value_usd",
        "OriginOrDestinationCountry": "original_destination_country",
        "RoutingCountry": "routing_country",
        "PaymentTerms": "payment_terms",
        "PaymentTermsNote": "payment_temrs_note",
        "PriceUnitUSD": "price_unit_usd",
        "PriceUnitCNY": "price_unit_cny",
        "QuantityUnit": "quantity_unit",
        "USCC": "uscc",
    }
    df = rename(df, columns)
    df["hscode"] = df["hscode"].str[:6]
    df["year"] = df["year"].str[:4]
    return df[COLUMNS_TO_KEEP]


def rename_columns_special(df):
    columns = {
        "HS_Code": "hscode",
        "Year": "year",
        "Trader_Code": "trader_code",
        "Chinese_TraderName_CN": "chinese_trader_name_cn",
        "Chinese_Trader_Name": "chinese_trader_name",
        "Plant_Location": "plant_location",
        "Company_Location": "company_location",
        "Customs": "customs",
        "Quantity": "quantity",
        "Total_Value_USD": "value_usd",
        "Trading_Country": "original_destination_country",
        "Routing_Country": "routing_country",
        "Price_Unit_USD": "price_unit_usd",
        "Price_Unit_CNY": "price_unit_cny",
        "Unit_Qty": "quantity_unit",
        "Incoterms": "payment_terms",
        "Incoterms_Note": "payment_temrs_note",
        "USCC": "uscc",
    }
    df = rename(df, columns=columns)
    df["hscode"] = df["hscode"].str[:6]
    df["year"] = df["year"].str[:4]
    return df[COLUMNS_TO_KEEP]


def country_abbreviation_seperation(df):
    """
    seperate countries and their abbreviation
    """
    df[
        ["original_destination_country", "original_destination_country_abbreviation"]
    ] = df["original_destination_country"].str.split("(", expand=True)
    df[["routing_country", "routing_country_abbreviation"]] = df[
        "routing_country"
    ].str.split("(", expand=True)
    df["original_destination_country_abbreviation"] = df[
        "original_destination_country_abbreviation"
    ].str[:-1]
    df["routing_country_abbreviation"] = df["routing_country_abbreviation"].str[:-1]
    return df


def clean_string(text, upper=True):
    """
    Take a string and clean it!

    - Remove double-whitespace
    - Remove tab, newline, return, formfeed, etc.
    - Replace accented characters (e.g. ö becomes o)
    - Trim leading and trailing whitespace
    - Convert to upper-case
    """

    def keep(character):
        category = unicodedata.category(character)
        return (
            category[0] != "C"  # ignore control characters
            and category != "Zl"  # ignore line separator
            and category != "Zp"  # ignore paragraph separator
        )

    text = "".join(c for c in text if keep(c))
    text = " ".join(text.split())
    # text = unidecode(text)
    return text.upper() if upper else text


def clean_string_columns(df, column_list):
    """
    Clean the string columns by replacing the missing values and adjusting formats
    :param df: dataframe
    :param column_list: list of column names, list(str)
    :return: df_2: cleaned dataframe
    """

    missing_value_list = ["NAN", "NONE", "NA", "NULL", ""]

    # clean the string columns
    for column in column_list:
        df[column] = df[column].fillna("UNKNOWN")
        df.loc[df[column].isin(missing_value_list), column] = "UNKNOWN"
        df[column] = df[column].apply(clean_string)

    return df


def clean_numerical_columns(df):
    """
    Clean the volume column by converting the datatype and drop non-numeric data
    :param df: dataframe
    :return: df_2: cleaned dataframe
    """
    condition = df["quantity"].str.replace(".", "", 1).str.isdigit()
    # print("Under Quantity, out of {len(df_2)} rows, there are {condition.sum()} rows convertable to numbers." )
    df = df.loc[condition].copy()
    df["quantity"] = df["quantity"].astype(float)

    condition = df["value_usd"].str.replace(".", "", 1).str.isdigit()
    # print("Under value_usd, out of {len(df_2)} rows, there are {condition.sum()} rows convertable to numbers." )
    df = df.loc[condition].copy()

    df["value_usd"] = df["value_usd"].astype(float)
    return df


def consistency_check(df, checked_columns_list):
    """
    check whether the values in a column(like payment method, price unit) are identical
    """

    for column in checked_columns_list:
        duplicate_check = df[column].duplicated(keep=False)
        if not duplicate_check.all():
            index_list = list(duplicate_check[[not i for i in duplicate_check]].index)
            print(f"Column {column} has inconsistent rows, with index: {index_list}.")
        else:
            print(f"Values in column {column} are identical.")


def seperate_province_city(df):
    location_columns = ["plant_location", "company_location"]
    for column_name in location_columns:
        df = pd.concat(
            [
                df,
                pd.DataFrame(
                    columns=[
                        f"{column_name}_province",
                        f"{column_name}_city",
                        f"{column_name}_district",
                        f"{column_name}_other_areas",
                    ],
                    dtype=object,
                ),
            ],
            sort=True,
        )
        df = df.apply(lambda row: seperate_province_city_row(row, column_name), axis=1)

    return df


def seperate_province_city_row(row, column_name):
    values = ["", "", "", ""]
    splited = row[column_name].split(", ")
    if len(splited) == 1:
        values[3] = splited[0]
    else:
        if splited[0] in ["Beijing", "Shanghai", "Tianjin", "Chongqing"]:
            values[1] = splited[0]
            values[2] = splited[1]
        else:
            values[0] = splited[0]
            values[1] = splited[1]

    row[
        [
            f"{column_name}_province",
            f"{column_name}_city",
            f"{column_name}_district",
            f"{column_name}_other_areas",
        ]
    ] = values

    return row


def clean_method(df):
    df = country_abbreviation_seperation(df)
    df = seperate_province_city(df)
    df = clean_numerical_columns(df)
    string_columns_list = {
        "hscode",
        "year",
        "trader_code",
        "chinese_trader_name_cn",
        "chinese_trader_name",
        "plant_location",
        "company_location",
        "customs",
        "original_destination_country",
        "original_destination_country_abbreviation",
        "routing_country",
        "routing_country_abbreviation",
        "payment_terms",
        "payment_temrs_note",
        "price_unit_usd",
        "price_unit_cny",
        "quantity_unit",
        "plant_location_province",
        "plant_location_city",
        "plant_location_district",
        "plant_location_other_areas",
        "company_location_province",
        "company_location_city",
        "company_location_district",
        "company_location_other_areas",
        "uscc",
    }
    df = clean_string_columns(df, string_columns_list)
    checked_columns_list = [
        "payment_terms",
        "payment_temrs_note",
        "price_unit_usd",
        "price_unit_cny",
        "quantity_unit",
    ]
    consistency_check(df, checked_columns_list)

    return df


def main():
    df_list = load_data()
    for i in range(len(df_list) - 1):
        df = df_list[i]
        df["USCC"] = ""
        df = rename_columns(df)
        df_list[i] = df
    df_list[4] = rename_columns_special(df_list[4])

    df = pd.concat(df_list)
    df = df.drop_duplicates().reset_index()
    df = clean_method(df)

    write_csv_for_upload(df, "china/trade/out/bol_soy_beef_palm_oil_2015_2018.csv")


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "3hs code imp")
    dbt.source("trase-storage-raw", "2hs_ccd42 201501-201612 import")
    dbt.source("trase-storage-raw", "6hs code")
    dbt.source("trase-storage-raw", "02023000-201501-201812imp ccd")
    dbt.source("trase-storage-raw", "2hs ccd42 exp 2015")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})