Ec Processing Facilities
s3://trase-storage/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.csv
Dbt path: trase_production.main.ec_processing_facilities
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/ecuador/logistics/shrimp_processing_facilities/out/_schema.yml
Model file link: trase/data_pipeline/models/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.py
Calls script: trase/data/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.py
Dbt test runs & lineage: Test results · Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, ecuador, logistics, out, shrimp_processing_facilities
ec_processing_facilities
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.actualiza_procesa_acuacultura_12octubre2017source.trase_duckdb.trase-storage-raw.china_matchessource.trase_duckdb.trase-storage-raw.tabula-actualiza_procesa_acuacultura_31agosto2018source.trase_duckdb.trase-storage-raw.tabula-actualiza_procesa_pesca_acua_13agosto2018source.trase_duckdb.trase-storage-raw.ec_parishessource.trase_duckdb.trase-storage-raw.tabula-actualiza_procesa_pesca_acua_24octubre2017source.trase_duckdb.trase-storage-raw.actualiza_procesa_pesca_acua_22diciembre2017source.trase_duckdb.trase-storage-raw.boundaries_city_to_parishmodel.trase_duckdb.ec_processing_facilities_eu
Sources
['trase-storage-raw', 'actualiza_procesa_acuacultura_12octubre2017']['trase-storage-raw', 'china_matches']['trase-storage-raw', 'tabula-actualiza_procesa_acuacultura_31agosto2018']['trase-storage-raw', 'tabula-actualiza_procesa_pesca_acua_13agosto2018']['trase-storage-raw', 'ec_parishes']['trase-storage-raw', 'tabula-actualiza_procesa_pesca_acua_24octubre2017']['trase-storage-raw', 'actualiza_procesa_pesca_acua_22diciembre2017']['trase-storage-raw', 'boundaries_city_to_parish']
# -*- coding: utf-8 -*-
"""
This script reads a list of processing facilities.
It uses a csv extracted from the original pdf using Tabula.
Author: Pernilla Löfgren
"""
from trase.tools.aws.aws_helpers import *
import io
import numpy as np
from trase.tools.aws.metadata import write_csv_for_upload
def get_df(
key,
sep=";",
bucket_name="trase-storage",
type=None,
skiprows=None,
encoding="latin1",
):
data = read_s3_csv(key, bucket_name=bucket_name)
if skiprows:
return remove_whitespace(
pd.read_csv(
io.BytesIO(data),
sep=sep,
encoding=encoding,
dtype=type,
skiprows=skiprows,
)
)
return remove_whitespace(
pd.read_csv(io.BytesIO(data), sep=sep, encoding=encoding, dtype=type)
)
def remove_whitespace(df):
for col in df.columns:
if df[col].dtype in (np.object, np.str):
df[col] = df[col].str.strip()
return df
# City to parish dict
city_df = get_df(
"ecuador/spatial/BOUNDARIES/city_to_parish.csv", sep=",", encoding="utf8"
)
# Parishcsv
parish_df = get_df(
"ecuador/spatial/BOUNDARIES/out/ec_parishes.csv", sep=";", encoding="utf8"
)
city_df = pd.merge(
city_df,
parish_df,
how="left",
left_on=["PARISH", "CANTON"],
right_on=["PARISH_NAME", "CANTON_NAME"],
)
city_df["PARISH_TRASE_ID"] = city_df["TRASE_ID"]
city_df = city_df[["PARISH_TRASE_ID", "CITY", "REGION", "CANTON"]]
# EU codes
eu_df = get_df(
"ecuador/logistics/shrimp_processing_facilities/out/eu/out/ec_processing_facilities_eu.csv",
encoding="utf8",
)
eu_codes = eu_df.CODE.tolist()
# China codes
china_df = get_df(
"ecuador/logistics/shrimp_processing_facilities/out/china/CHINA_matches.csv",
sep=",",
encoding="utf8",
)
china_df["CODE"] = china_df["APPROVALNO_CLEAN"]
china_codes = china_df.CODE.tolist()
# 2017
KEY_2017_pesca_dec = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/ACTUALIZA_PROCESA_PESCA_ACUA_22DICIEMBRE2017.csv"
KEY_2017_pesca_oct = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/tabula-ACTUALIZA_PROCESA_PESCA_ACUA_24OCTUBRE2017.csv"
KEY_2017_acuacultura = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/ACTUALIZA_PROCESA_ACUACULTURA_12OCTUBRE2017.csv"
df1 = get_df(KEY_2017_pesca_dec, encoding="utf8")
df2 = get_df(KEY_2017_pesca_oct, encoding="utf8")
df3 = get_df(KEY_2017_acuacultura, encoding="utf8")
df3 = df3[
["CODIGO", "NOMBRE", "DIRECCIÓN ESTABLECIMIENTO", "TELEFONOS", "CONTACTO"]
].rename(columns={"CODIGO": "CÓDIGO"})
# Remove duplicates in joint dataset - dfpesca2017.nunique()
dfpesca2017 = pd.concat([df1, df2])
dfpesca2017.drop_duplicates(
subset=["CÓDIGO", "NOMBRE", "DIRECCIÓN ESTABLECIMIENTO", "CONTACTO"], inplace=True
)
# dfpesca2017 = dfpesca2017[[]]
df2017 = pd.concat([dfpesca2017, df3])
assert len(df2017["CÓDIGO"].unique()) == len(df2017)
for column in ["NOMBRE", "DIRECCIÓN ESTABLECIMIENTO", "TELEFONOS", "CONTACTO"]:
df2017[column] = df2017[column].str.replace(r"\r", " ")
# 2018
KEY_2018_pesca = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/tabula-ACTUALIZA_PROCESA_PESCA_ACUA_13AGOSTO2018.csv"
KEY_2018_acuacultura = "ecuador/logistics/shrimp_processing_facilities/acuaculturaypesca/3-cleaned/tabula-ACTUALIZA_PROCESA_ACUACULTURA_31AGOSTO2018.csv"
df1 = get_df(KEY_2018_pesca, encoding="utf8")
df2 = get_df(KEY_2018_acuacultura, encoding="utf8")
df2018 = pd.concat([df1, df2])
assert len(df2018["CÓDIGO"].unique()) == len(df2018)
for column in ["NOMBRE", "DIRECCIÓN\rESTABLECIMIENTO", "TELEFONOS", "CONTACTO"]:
df2018[column] = df2018[column].str.replace(r"\r", " ")
# Preprocess and upload to s3
dfs = []
for year, df in {2017: df2017, 2018: df2018}.items():
df.rename(
columns={
"CÓDIGO": "CODE",
"NOMBRE": "NAME",
"DIRECCIÓN\rESTABLECIMIENTO": "ADDRESS",
"DIRECCIÓN ESTABLECIMIENTO": "ADDRESS",
"TELEFONOS": "TELEPHONE",
"CONTACTO": "CONTACT",
},
inplace=True,
)
# Compare with EU facilities
all_codes = df.CODE.tolist()
for eu_code in eu_codes:
if eu_code not in all_codes:
if f"PPA-{eu_code}" not in all_codes:
if f"PA-{eu_code}" not in all_codes:
print(eu_code)
# TODO: Half of them are 2019, we need to remove these facilities
print(
f"eu facility: {eu_code}, {eu_df[eu_df.CODE == eu_code].YEAR} "
)
# Match with EU dataset by code
df["SIMPLE_CODE"] = df.CODE.apply(lambda value: value.split("-")[-1]).astype(int)
eu_df["SIMPLE_CODE"] = eu_df.CODE.apply(lambda value: value.split("-")[-1]).astype(
int
)
tdf = pd.merge(df, eu_df, how="left", left_on="SIMPLE_CODE", right_on="SIMPLE_CODE")
# If there is a match in eu dataset flag this in a new column
tdf["EU_CERTIFIED"] = np.where(tdf.CODE_y.isna(), "FALSE", "TRUE")
tdf["TYPE"] = np.where(tdf.CODE_x.str.startswith("PPA-"), "F Aq", "Aq")
tdf.rename(
columns={
"CODE_x": "CODE",
"NAME_x": "NAME",
"NAME_y": "EU_NAME_SYNONYM",
"COMMENT": "EU_TYPE",
},
inplace=True,
)
tdf = tdf[
[
"CODE",
"NAME",
"ADDRESS",
"TELEPHONE",
"CONTACT",
"SIMPLE_CODE",
"CITY",
"REGION",
"EU_NAME_SYNONYM",
"EU_TYPE",
"EU_CERTIFIED",
"YEAR",
"TYPE",
]
]
for china_code in china_codes:
if china_code not in all_codes:
if f"PPA-{china_code}" not in all_codes:
if f"PA-{china_code}" not in all_codes:
print(f"chinese facility: {china_code}")
# Match with EU dataset by code
tdf = pd.merge(tdf, china_df, how="left", left_on="SIMPLE_CODE", right_on="CODE")
# If there is a match in china dataset flag this in a new column
tdf["CHINA_CERTIFIED"] = np.where(tdf.CODE_y.isna(), "FALSE", "TRUE")
# Update
tdf["CITY"] = np.where(tdf.CITY.isna(), tdf["City_County"].str.upper(), tdf.CITY)
type_dict = {
"A": "Aq",
"A - FISHERY PRODUCTS": "F Aq",
"FISHERY PRODUCTS": "F",
"AQUACULTURE PRODUCTS/FISHERY PRODUCTS": "F Aq",
"AQUACULTURE PRODUCTS": "Aq",
"A - AQUACULTURE PRODUCTS": "Aq",
}
tdf.Remark = tdf.Remark.map(type_dict)
tdf["TYPE"] = np.where(tdf.TYPE.isna(), tdf.Remark, tdf.TYPE)
tdf.rename(
columns={"CODE_x": "CODE", "NAME_x": "NAME", "NAME_y": "CHINA_NAME_SYNONYM"},
inplace=True,
)
tdf = tdf[
[
"CODE",
"NAME",
"ADDRESS",
"TELEPHONE",
"CONTACT",
"SIMPLE_CODE",
"CITY",
"REGION",
"EU_NAME_SYNONYM",
"EU_TYPE",
"EU_CERTIFIED",
"CHINA_NAME_SYNONYM",
"CHINA_CERTIFIED",
"YEAR",
"TYPE",
]
]
# TODO extract names that do not match : SYNONYMS
# names = tdf[tdf.NAME_x != tdf.NAME_y]
# Look at non_matches and merge in cantons from city dictionary
tdf = pd.merge(tdf, city_df, how="left", left_on="CITY", right_on="CITY")
# Rename columns and rejigg
tdf.rename(
columns={
"CODE_x": "CODE",
"NAME_x": "NAME",
"NAME_y": "CHINA_NAME_SYNONYM",
"REGION_x": "REGION",
},
inplace=True,
)
tdf = tdf[
[
"CODE",
"NAME",
"ADDRESS",
"TELEPHONE",
"CONTACT",
"SIMPLE_CODE",
"CITY",
"PARISH_TRASE_ID",
"REGION",
"CANTON",
"EU_NAME_SYNONYM",
"EU_TYPE",
"EU_CERTIFIED",
"CHINA_NAME_SYNONYM",
"CHINA_CERTIFIED",
"TYPE",
]
]
# Final push
print(
f"There are {len(tdf[tdf.CANTON.isna()])} addresses to check out of {len(tdf)}."
)
dfs.append(tdf)
# Combine the two files
concat_df = pd.concat(dfs)
final_df = concat_df.drop_duplicates(
subset=[
"CODE",
"NAME",
"SIMPLE_CODE",
"CITY",
"PARISH_TRASE_ID",
"REGION",
"CANTON",
"EU_NAME_SYNONYM",
"EU_TYPE",
"EU_CERTIFIED",
"CHINA_CERTIFIED",
"TYPE",
]
).reset_index(drop=True)
print(len(final_df))
assert len(final_df) == len(final_df.CODE.unique())
# Add source column
final_df["SOURCE"] = "www.acuaculturaypesca.gob.ec"
# Upload to s3
write_csv_for_upload(
final_df,
"ecuador/logistics/shrimp_processing_facilities/out/ec_processing_facilities.csv",
float_format="%.2f",
)
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "actualiza_procesa_acuacultura_12octubre2017")
dbt.source("trase-storage-raw", "china_matches")
dbt.ref("ec_processing_facilities_eu")
dbt.source("trase-storage-raw", "tabula-actualiza_procesa_acuacultura_31agosto2018")
dbt.source("trase-storage-raw", "tabula-actualiza_procesa_pesca_acua_13agosto2018")
dbt.source("trase-storage-raw", "ec_parishes")
dbt.source("trase-storage-raw", "tabula-actualiza_procesa_pesca_acua_24octubre2017")
dbt.source("trase-storage-raw", "actualiza_procesa_pesca_acua_22diciembre2017")
dbt.source("trase-storage-raw", "boundaries_city_to_parish")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})