Skip to content

Indonesia Shrimp 2018 Cleaned

s3://trase-storage/indonesia/shrimp/trade/cd/out/INDONESIA_SHRIMP_2018_CLEANED.csv

Dbt path: trase_production.main.indonesia_shrimp_2018_cleaned

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/indonesia/shrimp/trade/cd/out/_schema.yml

Model file link: trase/data_pipeline/models/indonesia/shrimp/trade/cd/out/indonesia_shrimp_2018_cleaned.py

Calls script: trase/data/indonesia/shrimp/trade/cd/out/INDONESIA_SHRIMP_201X_CLEANED.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, cd, indonesia, out, shrimp, trade


indonesia_shrimp_2018_cleaned

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/indonesia/shrimp/trade/cd/out/INDONESIA_SHRIMP_201X_CLEANED.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.indonesia_shrimp_2018

Sources

  • ['trase-storage-raw', 'indonesia_shrimp_2018']
from stdnum import luhn

from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION
from trase.tools.utilities.helpers import clean_string

YEARS = ["2015", "2016", "2017", "2018"]
TAX_ID_COLUMN = "EXPORTER_TAX_ID"


def load(year):
    df = get_pandas_df_once(
        f"indonesia/shrimp/trade/cd/out/INDONESIA_SHRIMP_{year}.csv",
        sep=";",
        encoding="utf8",
        dtype=str,
        keep_default_na=False,
    )
    return df


def clean_string_columns(df, columns):
    """
    clean the string columns, including capitalizing the letters and replacing accents
    :param df: dataframe we want to clean
    :param columns: list of strings, the names of string columns
    :return: df_2: dataframe with cleaned string columns
    """
    # clean the string columns
    for column in df.columns:
        df[column] = df[column].apply(clean_string)

    missing_value_list = ["NAN", "NONE", "NA", "NULL"]

    # replace null values to UNKNOWN
    assert all(df["COUNTRY_OF_ORIGIN"] == "INDONESIA")
    df.loc[
        df["COUNTRY_OF_DESTINATION"].isin(missing_value_list), "COUNTRY_OF_DESTINATION"
    ] = "UNKNOWN COUNTRY"
    df.loc[df["PORT_OF_EXPORT"].isin(missing_value_list), "PORT_OF_EXPORT"] = "UNKNOWN"
    df.loc[df["PORT_OF_IMPORT"].isin(missing_value_list), "PORT_OF_IMPORT"] = (
        "UNKNOWN PORT"
    )
    return df


def validate_npwp(df, year):
    """
    Check whether the first nine digits of the TIN/NPWP tax code are valid
    :param df: dataframe we are interested in
    :param year: the year of the resource
    """
    tax_identification_number = df[TAX_ID_COLUMN].str.slice(0, 9).drop_duplicates()
    is_valid = tax_identification_number.apply(luhn.is_valid)
    invalid_npwp = tax_identification_number[~is_valid]
    assert all(
        is_valid
    ), f"out of {len(is_valid)} rows, {sum(~is_valid)} NPWP are invalid: {invalid_npwp}"


def create_trase_id(df):
    """
    Create trase id for traders with 9 digits
    """
    df["TRASE_ID"] = "ID-TRADER-" + df[TAX_ID_COLUMN].str.slice(0, 9)
    return df


def main():
    for year in YEARS:
        S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION.clear()
        df = load(year)
        df = clean_string_columns(
            df,
            [
                "PORT_OF_EXPORT",
                "PORT_OF_IMPORT",
                "COUNTRY_OF_ORIGIN",
                "COUNTRY_OF_DESTINATION",
            ],
        )
        validate_npwp(df, year)
        df = create_trase_id(df)
        write_csv_for_upload(
            df, f"indonesia/shrimp/trade/cd/out/INDONESIA_SHRIMP_{year}_CLEANED_.csv"
        )


if __name__ == "__main__":
    main()
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "indonesia_shrimp_2018")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})