Cd Ecuador 2019
s3://trase-storage/ecuador/trade/cd/export/cleaned/CD_ECUADOR_2019.csv
Dbt path: trase_production.main.cd_ecuador_2019
Explore on Metabase: Full table; summary statistics
Containing yaml file link: trase/data_pipeline/models/ecuador/trade/cd/export/cleaned/_schema.yml
Model file link: trase/data_pipeline/models/ecuador/trade/cd/export/cleaned/cd_ecuador_2019.py
Calls script: trase/tools/aws/metadata.py
Dbt test runs & lineage: Test results ยท Lineage
Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)
Tags: mock_model, cd, cleaned, ecuador, export, trade
cd_ecuador_2019
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/tools/aws/metadata.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
source.trase_duckdb.trase-storage-raw.cd_ecuador_2019
Sources
['trase-storage-raw', 'cd_ecuador_2019']
"""
This module creates and manages metadata; primarily intended for objects in AWS S3, but
not necessarily limited to this.
The initial aim of this metadata is to record the following information for our S3
objects:
- The script that produced the object (e.g. file path relative to Git and commit hash)
- The person that ran the script
- The upstream datasets that the object was produced from ("dataset provenance")
## Note on aligning with existing metadata standards
There have been many attempts over the years in the wider data community to standardise
metadata files. Where possible Trase should align ourselves to external standards rather
than inventing our own. Two of the most promising and widely-mentioned are:
- "Tabular Data Resource" from Frictionless Data:
https://specs.frictionlessdata.io/tabular-data-resource/
- "Model for Tabular Data and Metadata on the Web" from W3C
https://www.w3.org/TR/tabular-data-model/
The former seems to have more recent activity than the latter. Therefore, this module
is written with the goal of eventually aligning with Tabular Data Resource, in
particular in how it documents the columns and types of the CSV file, the delimiter,
which values are to be interpreted as "missing" and so on. However, since this is
largely standardised across Trase, it is not so urgent to start including such
information in our metadata, at least not for internal data. Therefore, for now this
module is concerned only with data provenance and the source script.
"""
import dataclasses
import inspect
import json
import os
import posixpath
import re
import subprocess
import sys
from getpass import getuser
from logging import getLogger
from tempfile import gettempdir
from typing import Generator, List, Optional, Tuple
import yaml
from termcolor import colored
from trase.config import aws_session, settings
from trase.tools.aws.tracker import S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION, S3Object
from trase.tools.utilities.helpers import Kinship
# include (True) or exclude (False) some objects from pdoc
__pdoc__ = {"DataclassEncoder": False, "NotAGitRepository": False}
TRASE_METADATA_VERSION_ID = "trase-metadata-version-id"
TRASE_METADATA_KEY = "trase-metadata-key"
TRASE_METADATA_BUCKET = "trase-metadata-bucket"
LOGGER = getLogger(__name__)
GITHUB_REGEX = r"([-|\w]+)/(\w+)[.]git" # e.g. sei-international/TRASE.git
@dataclasses.dataclass
class Script:
"""The location of a GitHub script"""
# Path of the file within the repository
path: str
# Commit hash in the repository, if available
commit_hash: Optional[str]
# here we record in which GitHub repository the script is located. For now let's
# just assume all scripts are located github.com/sei-international/TRASE. If we ever
# use this python module for scripts that are in another repository we'll have to
# come up with a clever solution
github_username: Optional[str] = "sei-international"
github_repository: Optional[str] = "TRASE"
# currently the only supported location of a script is in a GitHub repository,
# but we have a field called "type" just in case we want to add other sources in the
# future
type: Optional[str] = "github_script"
@dataclasses.dataclass
class Metadata:
script: Script
user: str
upstream: List[S3Object] = dataclasses.field(default_factory=list)
def generate_metadata(script_path: str, upstream: List[S3Object] = None) -> Metadata:
upstream = (
list(S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION) if upstream is None else upstream
)
try:
script = get_metadata_from_git(script_path)
except (NotAGitRepository, ValueError):
# This code is for when the codebase is installed as a Python module and not
# run from a Git repository. In that scenario fetching metadata from Git will
# fail so we look to specific environment variables instead
try:
script = get_metadata_from_environment()
except:
script = Script(None, None, script_path, None)
return Metadata(script=script, user=getuser(), upstream=upstream)
class NotAGitRepository(Exception):
pass
def _execute(command, script_path, **kwargs):
# run a command inside the git repository of the file (we do not assume that
# the file is in the TRASE repository)
directory = os.path.dirname(script_path) or "." if script_path is not None else "."
stdout = subprocess.check_output(command, cwd=directory, **kwargs)
return stdout.decode().strip()
def get_metadata_from_git(script_path) -> Script:
"""
Fetch git commit hash information: requires user to have "git" installed and also
assumes that `file` is in a git repository
"""
commit_hash = get_commit_hash(script_path)
script_path = get_script_path_relative_to_root(script_path)
return Script(script_path, commit_hash)
def get_commit_hash(script_path):
try:
commit_hash = _execute(
["git", "rev-parse", "HEAD"],
script_path=script_path,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
if "fatal: not a git repository" in e.output.decode():
LOGGER.warning(f"The script {script_path} is not in a Git repository")
raise NotAGitRepository
else:
# some other error - maybe git isn't installed?
LOGGER.exception("Could not find git")
raise ValueError("Could not find git")
except:
LOGGER.exception("Could not find git")
raise ValueError("Could not find git")
return commit_hash
def get_script_path_relative_to_root(script_path):
# ensure that the path is relative to the root of the git repository
root = _execute(["git", "rev-parse", "--show-toplevel"], script_path)
return os.path.relpath(script_path, root)
def get_metadata_from_environment() -> Script:
script_path = os.environ["SCRIPT_PATH"]
commit_hash = os.environ["COMMIT_HASH"]
try:
script_path = get_script_path_relative_to_root(script_path)
except:
pass
return Script(script_path, commit_hash)
class DataclassEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o):
return dataclasses.asdict(o)
return super().default(o)
def dataclass_to_dict(object: dataclasses.dataclass) -> dict:
data = json.dumps(object, cls=DataclassEncoder)
return json.loads(data)
def write_file_and_metadata(
path: str,
script_path,
metadata_path,
do_write,
upstream: List[S3Object] = None,
):
"""Write a file to disk along with a metadata file.
Args:
script_path: the path to the script that made the dataframe (you can take this
from `__file__`)
path: the destination file path to serialize the dataframe to
metadata_path: the destination fie path of the metadata file on disk
do_write: a function that takes a file path and writes the file there
upstream (optional): an iterable of `trase.tools.aws.metadata.S3Object` objects
representing the upstream datasets that were used to produce the dataframe.
Returns: the path of the metadata file that was written
"""
upstream = (
list(S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION) if upstream is None else upstream
)
metadata_object = generate_metadata(script_path, upstream)
do_write(path)
write_metadata(metadata_object, metadata_path)
def write_metadata(metadata: Metadata, path):
metadata_dictionary = dataclass_to_dict(metadata)
with open(path, "w") as file:
yaml.dump(metadata_dictionary, file)
def generate_metadata_key(key):
return posixpath.splitext(key)[0] + ".yml"
def upload_file_and_metadata(
client,
bucket: str,
key: str,
path: str,
metadata_path: str,
metadata_key: str,
printer=LOGGER.debug,
):
"""Uploads a file and its metadata file to S3.
The important aspect of this function is that the S3 object will be uploaded with
custom S3 metadata which allows you to locate the metadata object in S3.
Args:
bucket: the s3 bucket that the file and metadata should be uploaded to
key: the s3 key that the file should be uploaed to
path: the location of the file on disk
metadata_path: the location of the metadata file on disk
metadata_key: the s3 key that the metadata file should be uploaded to
Raises:
ValueError: if the key and the generated metadata key are the same. This can
happen, for example, if the original object was itself a .yml file. In this
case.
ValueError: if the files at `path_to_object` or `path_to_metadata` do not exist
"""
from trase.tools.aws.s3_helpers import get_version_id
# abort if the metadata has the same s3 key as the object
if posixpath.normpath(key) == posixpath.normpath(metadata_key):
raise ValueError("Object and metadata key are the same")
# check the files exist
missing = [p for p in (metadata_path, path) if not os.path.isfile(p)]
if missing:
raise ValueError(f"The following file(s) are missing: {', '.join(missing)}")
# first upload the metadata
client.upload_file(metadata_path, Bucket=bucket, Key=metadata_key)
printer(f"Uploaded {metadata_path} to s3://{bucket}/{metadata_key}")
# get the version_id of the metadata
version_id = get_version_id(client, bucket, metadata_key)
LOGGER.debug(f"Version ID of s3://{bucket}/{metadata_key} is {version_id}")
# now upload the object with the location of the metadata object as S3 metadata
client.upload_file(
path,
Bucket=bucket,
Key=key,
ExtraArgs={
"Metadata": {
TRASE_METADATA_BUCKET: bucket,
TRASE_METADATA_KEY: metadata_key,
TRASE_METADATA_VERSION_ID: version_id or "",
}
},
)
printer(f"Uploaded {path} to s3://{bucket}/{key}")
def generate_metadata_path(path):
return os.path.splitext(path)[0] + ".yml"
def write_file_for_upload(
key: str,
do_write,
script_path: str = None,
path: str = None,
metadata_path: str = None,
metadata_key: str = None,
bucket: str = settings.bucket,
do_upload: bool = None,
upstream: List[S3Object] = None,
):
if do_upload is None:
do_upload = "--upload" in sys.argv
path = path or os.path.join(gettempdir(), posixpath.basename(key))
script_path = script_path or inspect.stack()[2].filename
metadata_path = metadata_path or generate_metadata_path(path)
upstream = upstream or list(S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION)
write_file_and_metadata(
path,
script_path,
metadata_path,
do_write,
upstream,
)
print(f"Written {path}")
print(f"Written {metadata_path}")
metadata_key = metadata_key or generate_metadata_key(key)
if do_upload:
upload_file_and_metadata(
client=aws_session().client("s3"),
bucket=bucket,
key=key,
path=path,
metadata_path=metadata_path,
metadata_key=metadata_key,
printer=print,
)
else:
# print instructions to the user on how they can do the upload themselves
# we need to single-quote file paths: not only in case there are spaces in the
# filenames, but in particular so that users on Windows systems who are using
# Bash shells do not get backslashes interpreted as escape characters
def single_quote(text):
return f"'{text}'"
# build the command string. if bucket/metadata/metadata key are default then
# omit them from the command, just so it's not so long and scary.
command = ["trase", "s3", "upload"]
if bucket != settings.bucket:
command.extend(["--bucket", bucket])
if metadata_path != generate_metadata_path(path):
command.extend(["--metadata", single_quote(metadata_path)])
if metadata_key != generate_metadata_key(key):
command.extend(["--metadata-key", single_quote(metadata_key)])
command.extend([single_quote(path), single_quote(key)])
c = " ".join(command)
# print message to the user
print(
f"\nThe file has been {colored('written but not uploaded', attrs=['bold'])}"
f". Once you have inspected the file you can either:\n\n"
f" 1) Re-run the script with --upload, or\n\n"
f" 2) Use the Trase CLI:\n\n {c}\n"
)
def write_csv_for_upload(
df: "pd.DataFrame",
key: str,
script_path: str = None,
path: str = None,
metadata_path: str = None,
metadata_key: str = None,
bucket: str = settings.bucket,
do_upload: bool = None,
upstream: List[S3Object] = None,
**pd_to_csv_kwargs,
):
"""Writes a CSV file and either indicates to the user how they should upload it to,
S3 or actually performs the upload itself.
It is a very intentional design that, by default, the upload to S3 does not occur.
We want the user to _explicitly choose_ to do the upload to minimise the chance that
they accidentally overwrite data on S3. By putting a "human in the loop" we also
give the user a chance to inspect the data before upload.
Args:
df: the Pandas dataframe that should be serialized
key: the s3 key that the dataframe should be uploaded to
script_path (optional): the path of the script that generated the file. Often
this can be taken from `__file__`. If it is not supplied then we will
inspect the call stack and take the filename of the file which called this
function, which may or may not be what was intended.
path (optional): the (temporary) file location where the Pandas dataframe will
be written to before uploading. The reason it is written to disk is to give
the user the chance to inspect its contents before uploading. If not
provided then a path will be generated in the operating system's temporary
directory.
metadata_path (optional): the (temporary) file location where the metadata will
be written to before uploading. If not provided then it will be written next
to the data file with a ".yml" extension.
metadata_key (optional): the s3 key where the metadata will be uploaded to. If
not provided then it will be the same as the s3 key for the data file, but
with a ".yml" extension.
bucket (optional): the s3 bucket to upload the file and metadata file to.
Defaults to bucket defined in `trase.config.settings`.
upstream (optional): an iterable of `trase.tools.aws.metadata.S3Object` objects
representing the upstream datasets that were used to produce the dataframe.
If not provided these will be taken from
`trase.tools.aws.metadata.S3_OBJECTS_ACCESSED_IN_CURRENT_SESSION`
do_upload (optional): whether to actually do the upload or just print to the
user that they should do it themselves. If not provided then we will look
for the presence of "--upload" in `sys.argv` to determine its value.
pd_to_csv_kwargs: keyword arguments that will be passed to pd.to_csv
"""
def do_write(path):
df.to_csv(
path, **{"sep": ";", "encoding": "utf8", "index": False, **pd_to_csv_kwargs}
)
write_file_for_upload(
key=key,
do_write=do_write,
script_path=script_path,
path=path,
metadata_path=metadata_path,
metadata_key=metadata_key,
bucket=bucket,
do_upload=do_upload,
upstream=upstream,
)
def write_json_for_upload(
data,
key: str,
script_path: str = None,
path: str = None,
metadata_path: str = None,
metadata_key: str = None,
bucket: str = settings.bucket,
do_upload: bool = None,
upstream: List[S3Object] = None,
**json_dump_kwargs,
):
def do_write(path):
with open(path, "w") as file:
json.dump(data, file, **json_dump_kwargs)
write_file_for_upload(
key=key,
do_write=do_write,
script_path=script_path,
path=path,
metadata_path=metadata_path,
metadata_key=metadata_key,
bucket=bucket,
do_upload=do_upload,
upstream=upstream,
)
def write_parquet_for_upload(
df: "pd.DataFrame",
key: str,
is_polars=False,
script_path: str = None,
path: str = None,
metadata_path: str = None,
metadata_key: str = None,
bucket: str = settings.bucket,
do_upload: bool = None,
upstream: List[S3Object] = None,
**to_parquet_kwargs,
):
def do_write(path):
if is_polars:
df.write_parquet(path, **to_parquet_kwargs)
else:
df.to_parquet(path, **to_parquet_kwargs)
write_file_for_upload(
key=key,
do_write=do_write,
script_path=script_path,
path=path,
metadata_path=metadata_path,
metadata_key=metadata_key,
bucket=bucket,
do_upload=do_upload,
upstream=upstream,
)
def write_geopandas_for_upload(
gdf: "gpd.GeoDataFrame",
key: str,
script_path: str = None,
path: str = None,
metadata_path: str = None,
metadata_key: str = None,
bucket: str = settings.bucket,
do_upload: bool = None,
upstream: List[S3Object] = None,
driver: str = "geojson", # "parquet" | "geojson" | "gpkg"
**to_file_kwargs,
):
"""
Writes a GeoDataFrame to disk and optionally uploads to S3 with metadata.
Supported drivers:
- "parquet" -> GeoParquet
- "geojson" -> GeoJSON
- "gpkg" -> GeoPackage
"""
import geopandas as gpd
import os
def do_write(path):
if driver == "parquet":
gdf.to_parquet(path, **to_file_kwargs)
elif driver == "geojson":
gdf.to_file(path, driver="GeoJSON", **to_file_kwargs)
elif driver == "gpkg":
gdf.to_file(path, driver="GPKG", **to_file_kwargs)
else:
raise ValueError(f"Unsupported driver: {driver}")
# default extension based on driver
if path is None:
ext = {
"parquet": ".parquet",
"geojson": ".geojson",
"gpkg": ".gpkg",
}.get(driver, ".dat")
path = os.path.join(gettempdir(), posixpath.basename(key) + ext)
write_file_for_upload(
key=key,
do_write=do_write,
script_path=script_path,
path=path,
metadata_path=metadata_path,
metadata_key=metadata_key,
bucket=bucket,
do_upload=do_upload,
upstream=upstream,
)
def find_metadata_object(
client, key, bucket, version_id=None, check_exists=True
) -> Tuple[str, str, Optional[str]]:
"""Tries to find the metadata object on S3 for a given S3 object
Returns: a tuple of (bucket, key, version_id) for the metadata object, where
version_id may be None.
"""
from trase.tools.aws.s3_helpers import head_object
default_version_id = None
default_metadata_key = generate_metadata_key(key)
default_metadata_bucket = bucket
response = head_object(client, bucket, key, version_id)
metadata = response.get("Metadata", {})
metadata_version_id = metadata.get(TRASE_METADATA_VERSION_ID, default_version_id)
metadata_key = metadata.get(TRASE_METADATA_KEY, default_metadata_key)
metadata_bucket = metadata.get(TRASE_METADATA_BUCKET, default_metadata_bucket)
if check_exists:
head_object(client, metadata_bucket, metadata_key, metadata_version_id)
return metadata_bucket, metadata_key, metadata_version_id
def crawl_upstream(
client, bucket, key, version_id=None, seen=None
) -> Generator[Kinship, None, None]:
from trase.tools.aws.aws_helpers import read_yaml
from trase.tools.aws.s3_helpers import object_exists
seen = seen or set()
LOGGER.debug(f"Looking for s3://{bucket}/{key}:{version_id}")
metadata_bucket, metadata_key, metadata_version_id = find_metadata_object(
client, key, bucket, version_id, check_exists=False
)
if object_exists(client, metadata_bucket, metadata_key, metadata_version_id):
parent = S3Object(key, bucket, version_id)
metadata = read_yaml(
metadata_key,
metadata_bucket,
metadata_version_id,
track=False,
)
for upstream in metadata["upstream"] or []:
# yield a relationship between ourselves and this upstream dataset
child = S3Object(
bucket=upstream["bucket"],
key=upstream["key"],
version_id=upstream["version_id"],
)
kinship = Kinship(parent, child)
if kinship in seen:
break
else:
seen.add(kinship)
yield kinship
# recurse into the upstream datasets of the child
if object_exists(client, child.bucket, child.key, child.version_id):
for descendent in crawl_upstream(
client, child.bucket, child.key, child.version_id, seen
):
yield descendent
import pandas as pd
def model(dbt, cursor):
dbt.source("trase-storage-raw", "cd_ecuador_2019")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})