Skip to content

Ec Processing Facilities

s3://trase-storage/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.csv

Dbt path: trase_production.main.ec_processing_facilities

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/ecuador/logistics/shrimp_processing_facilities/out/_schema.yml

Model file link: trase/data_pipeline/models/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.py

Calls script: trase/data/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.py

Dbt test runs & lineage: Test results · Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model, ecuador, logistics, out, shrimp_processing_facilities


ec_processing_facilities

Description

This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.py [permalink]. It was last run by Harry Biddle.


Details

Column Type Description

Models / Seeds

  • source.trase_duckdb.trase-storage-raw.actualiza_procesa_acuacultura_12octubre2017
  • source.trase_duckdb.trase-storage-raw.china_matches
  • source.trase_duckdb.trase-storage-raw.tabula-actualiza_procesa_acuacultura_31agosto2018
  • source.trase_duckdb.trase-storage-raw.tabula-actualiza_procesa_pesca_acua_13agosto2018
  • source.trase_duckdb.trase-storage-raw.ec_parishes
  • source.trase_duckdb.trase-storage-raw.tabula-actualiza_procesa_pesca_acua_24octubre2017
  • source.trase_duckdb.trase-storage-raw.actualiza_procesa_pesca_acua_22diciembre2017
  • source.trase_duckdb.trase-storage-raw.boundaries_city_to_parish
  • model.trase_duckdb.ec_processing_facilities_eu

Sources

  • ['trase-storage-raw', 'actualiza_procesa_acuacultura_12octubre2017']
  • ['trase-storage-raw', 'china_matches']
  • ['trase-storage-raw', 'tabula-actualiza_procesa_acuacultura_31agosto2018']
  • ['trase-storage-raw', 'tabula-actualiza_procesa_pesca_acua_13agosto2018']
  • ['trase-storage-raw', 'ec_parishes']
  • ['trase-storage-raw', 'tabula-actualiza_procesa_pesca_acua_24octubre2017']
  • ['trase-storage-raw', 'actualiza_procesa_pesca_acua_22diciembre2017']
  • ['trase-storage-raw', 'boundaries_city_to_parish']
# -*- coding: utf-8 -*-
"""
This script reads a list of processing facilities.
It uses a csv extracted from the original pdf using Tabula.
 Author: Pernilla Löfgren
"""
from trase.tools.aws.aws_helpers import *
import io
import numpy as np

from trase.tools.aws.metadata import write_csv_for_upload


def get_df(
    key,
    sep=";",
    bucket_name="trase-storage",
    type=None,
    skiprows=None,
    encoding="latin1",
):
    data = read_s3_csv(key, bucket_name=bucket_name)
    if skiprows:
        return remove_whitespace(
            pd.read_csv(
                io.BytesIO(data),
                sep=sep,
                encoding=encoding,
                dtype=type,
                skiprows=skiprows,
            )
        )
    return remove_whitespace(
        pd.read_csv(io.BytesIO(data), sep=sep, encoding=encoding, dtype=type)
    )


def remove_whitespace(df):
    for col in df.columns:
        if df[col].dtype in (np.object, np.str):
            df[col] = df[col].str.strip()
    return df


# City to parish dict
city_df = get_df(
    "ecuador/spatial/BOUNDARIES/city_to_parish.csv", sep=",", encoding="utf8"
)

# Parishcsv
parish_df = get_df(
    "ecuador/spatial/BOUNDARIES/out/ec_parishes.csv", sep=";", encoding="utf8"
)
city_df = pd.merge(
    city_df,
    parish_df,
    how="left",
    left_on=["PARISH", "CANTON"],
    right_on=["PARISH_NAME", "CANTON_NAME"],
)
city_df["PARISH_TRASE_ID"] = city_df["TRASE_ID"]
city_df = city_df[["PARISH_TRASE_ID", "CITY", "REGION", "CANTON"]]

# EU codes
eu_df = get_df(
    "ecuador/logistics/shrimp_processing_facilities/out/eu/out/ec_processing_facilities_eu.csv",
    encoding="utf8",
)
eu_codes = eu_df.CODE.tolist()

# China codes
china_df = get_df(
    "ecuador/logistics/shrimp_processing_facilities/out/china/CHINA_matches.csv",
    sep=",",
    encoding="utf8",
)
china_df["CODE"] = china_df["APPROVALNO_CLEAN"]
china_codes = china_df.CODE.tolist()

# 2017
KEY_2017_pesca_dec = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/ACTUALIZA_PROCESA_PESCA_ACUA_22DICIEMBRE2017.csv"
KEY_2017_pesca_oct = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/tabula-ACTUALIZA_PROCESA_PESCA_ACUA_24OCTUBRE2017.csv"
KEY_2017_acuacultura = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/ACTUALIZA_PROCESA_ACUACULTURA_12OCTUBRE2017.csv"
df1 = get_df(KEY_2017_pesca_dec, encoding="utf8")
df2 = get_df(KEY_2017_pesca_oct, encoding="utf8")
df3 = get_df(KEY_2017_acuacultura, encoding="utf8")
df3 = df3[
    ["CODIGO", "NOMBRE", "DIRECCIÓN ESTABLECIMIENTO", "TELEFONOS", "CONTACTO"]
].rename(columns={"CODIGO": "CÓDIGO"})
# Remove duplicates in joint dataset - dfpesca2017.nunique()
dfpesca2017 = pd.concat([df1, df2])
dfpesca2017.drop_duplicates(
    subset=["CÓDIGO", "NOMBRE", "DIRECCIÓN ESTABLECIMIENTO", "CONTACTO"], inplace=True
)
# dfpesca2017 = dfpesca2017[[]]
df2017 = pd.concat([dfpesca2017, df3])
assert len(df2017["CÓDIGO"].unique()) == len(df2017)
for column in ["NOMBRE", "DIRECCIÓN ESTABLECIMIENTO", "TELEFONOS", "CONTACTO"]:
    df2017[column] = df2017[column].str.replace(r"\r", " ")

# 2018
KEY_2018_pesca = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/tabula-ACTUALIZA_PROCESA_PESCA_ACUA_13AGOSTO2018.csv"
KEY_2018_acuacultura = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/tabula-ACTUALIZA_PROCESA_ACUACULTURA_31AGOSTO2018.csv"
df1 = get_df(KEY_2018_pesca, encoding="utf8")
df2 = get_df(KEY_2018_acuacultura, encoding="utf8")
df2018 = pd.concat([df1, df2])
assert len(df2018["CÓDIGO"].unique()) == len(df2018)
for column in ["NOMBRE", "DIRECCIÓN\rESTABLECIMIENTO", "TELEFONOS", "CONTACTO"]:
    df2018[column] = df2018[column].str.replace(r"\r", " ")

# Preprocess and upload to s3
dfs = []
for year, df in {2017: df2017, 2018: df2018}.items():
    df.rename(
        columns={
            "CÓDIGO": "CODE",
            "NOMBRE": "NAME",
            "DIRECCIÓN\rESTABLECIMIENTO": "ADDRESS",
            "DIRECCIÓN ESTABLECIMIENTO": "ADDRESS",
            "TELEFONOS": "TELEPHONE",
            "CONTACTO": "CONTACT",
        },
        inplace=True,
    )

    # Compare with EU facilities
    all_codes = df.CODE.tolist()
    for eu_code in eu_codes:
        if eu_code not in all_codes:
            if f"PPA-{eu_code}" not in all_codes:
                if f"PA-{eu_code}" not in all_codes:
                    print(eu_code)
                    # TODO: Half of them are 2019, we need to remove these facilities
                    print(
                        f"eu facility: {eu_code}, {eu_df[eu_df.CODE == eu_code].YEAR} "
                    )

    # Match with EU dataset by code
    df["SIMPLE_CODE"] = df.CODE.apply(lambda value: value.split("-")[-1]).astype(int)
    eu_df["SIMPLE_CODE"] = eu_df.CODE.apply(lambda value: value.split("-")[-1]).astype(
        int
    )
    tdf = pd.merge(df, eu_df, how="left", left_on="SIMPLE_CODE", right_on="SIMPLE_CODE")

    # If there is a match in eu dataset flag this in a new column
    tdf["EU_CERTIFIED"] = np.where(tdf.CODE_y.isna(), "FALSE", "TRUE")
    tdf["TYPE"] = np.where(tdf.CODE_x.str.startswith("PPA-"), "F Aq", "Aq")
    tdf.rename(
        columns={
            "CODE_x": "CODE",
            "NAME_x": "NAME",
            "NAME_y": "EU_NAME_SYNONYM",
            "COMMENT": "EU_TYPE",
        },
        inplace=True,
    )
    tdf = tdf[
        [
            "CODE",
            "NAME",
            "ADDRESS",
            "TELEPHONE",
            "CONTACT",
            "SIMPLE_CODE",
            "CITY",
            "REGION",
            "EU_NAME_SYNONYM",
            "EU_TYPE",
            "EU_CERTIFIED",
            "YEAR",
            "TYPE",
        ]
    ]
    for china_code in china_codes:
        if china_code not in all_codes:
            if f"PPA-{china_code}" not in all_codes:
                if f"PA-{china_code}" not in all_codes:
                    print(f"chinese facility: {china_code}")

    # Match with EU dataset by code
    tdf = pd.merge(tdf, china_df, how="left", left_on="SIMPLE_CODE", right_on="CODE")
    # If there is a match in china dataset flag this in a new column
    tdf["CHINA_CERTIFIED"] = np.where(tdf.CODE_y.isna(), "FALSE", "TRUE")
    # Update
    tdf["CITY"] = np.where(tdf.CITY.isna(), tdf["City_County"].str.upper(), tdf.CITY)
    type_dict = {
        "A": "Aq",
        "A - FISHERY PRODUCTS": "F Aq",
        "FISHERY PRODUCTS": "F",
        "AQUACULTURE PRODUCTS/FISHERY PRODUCTS": "F Aq",
        "AQUACULTURE PRODUCTS": "Aq",
        "A - AQUACULTURE PRODUCTS": "Aq",
    }
    tdf.Remark = tdf.Remark.map(type_dict)
    tdf["TYPE"] = np.where(tdf.TYPE.isna(), tdf.Remark, tdf.TYPE)
    tdf.rename(
        columns={"CODE_x": "CODE", "NAME_x": "NAME", "NAME_y": "CHINA_NAME_SYNONYM"},
        inplace=True,
    )
    tdf = tdf[
        [
            "CODE",
            "NAME",
            "ADDRESS",
            "TELEPHONE",
            "CONTACT",
            "SIMPLE_CODE",
            "CITY",
            "REGION",
            "EU_NAME_SYNONYM",
            "EU_TYPE",
            "EU_CERTIFIED",
            "CHINA_NAME_SYNONYM",
            "CHINA_CERTIFIED",
            "YEAR",
            "TYPE",
        ]
    ]

    # TODO extract names that do not match : SYNONYMS
    # names = tdf[tdf.NAME_x != tdf.NAME_y]
    # Look at non_matches and merge in cantons from city dictionary
    tdf = pd.merge(tdf, city_df, how="left", left_on="CITY", right_on="CITY")

    # Rename columns and rejigg
    tdf.rename(
        columns={
            "CODE_x": "CODE",
            "NAME_x": "NAME",
            "NAME_y": "CHINA_NAME_SYNONYM",
            "REGION_x": "REGION",
        },
        inplace=True,
    )
    tdf = tdf[
        [
            "CODE",
            "NAME",
            "ADDRESS",
            "TELEPHONE",
            "CONTACT",
            "SIMPLE_CODE",
            "CITY",
            "PARISH_TRASE_ID",
            "REGION",
            "CANTON",
            "EU_NAME_SYNONYM",
            "EU_TYPE",
            "EU_CERTIFIED",
            "CHINA_NAME_SYNONYM",
            "CHINA_CERTIFIED",
            "TYPE",
        ]
    ]

    # Final push
    print(
        f"There are {len(tdf[tdf.CANTON.isna()])} addresses to check out of {len(tdf)}."
    )
    dfs.append(tdf)

# Combine the two files
concat_df = pd.concat(dfs)
final_df = concat_df.drop_duplicates(
    subset=[
        "CODE",
        "NAME",
        "SIMPLE_CODE",
        "CITY",
        "PARISH_TRASE_ID",
        "REGION",
        "CANTON",
        "EU_NAME_SYNONYM",
        "EU_TYPE",
        "EU_CERTIFIED",
        "CHINA_CERTIFIED",
        "TYPE",
    ]
).reset_index(drop=True)
print(len(final_df))
assert len(final_df) == len(final_df.CODE.unique())

# Add source column
final_df["SOURCE"] = "www.acuaculturaypesca.gob.ec"

# Upload to s3
write_csv_for_upload(
    final_df,
    "ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.csv",
    float_format="%.2f",
)
import pandas as pd


def model(dbt, cursor):
    dbt.source("trase-storage-raw", "actualiza_procesa_acuacultura_12octubre2017")
    dbt.source("trase-storage-raw", "china_matches")
    dbt.ref("ec_processing_facilities_eu")
    dbt.source("trase-storage-raw", "tabula-actualiza_procesa_acuacultura_31agosto2018")
    dbt.source("trase-storage-raw", "tabula-actualiza_procesa_pesca_acua_13agosto2018")
    dbt.source("trase-storage-raw", "ec_parishes")
    dbt.source("trase-storage-raw", "tabula-actualiza_procesa_pesca_acua_24octubre2017")
    dbt.source("trase-storage-raw", "actualiza_procesa_pesca_acua_22diciembre2017")
    dbt.source("trase-storage-raw", "boundaries_city_to_parish")

    raise NotImplementedError()
    return pd.DataFrame({"hello": ["world"]})