DBT: Postgres Eu Economic Blocs
File location: s3://trase-storage/postgres_views/postgres_eu_economic_blocs.parquet
DBT model name: postgres_eu_economic_blocs
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main.postgres_eu_economic_blocs -
Containing yaml link: trase/data_pipeline/models/postgres_views/_schema_postgres_tables.yml
-
Model file: trase/data_pipeline/models/postgres_views/postgres_eu_economic_blocs.py
-
Tags:
postgres,economic_blocs
Description
Includes countries in the EU, with eu_date_start and eu_date_end
Details
| Column | Type | Description |
|---|---|---|
country_trase_id |
VARCHAR |
|
country_name |
VARCHAR |
|
eu_date_start |
DATE |
|
eu_date_end |
DATE |
**Macros**
- `macro.trase_duckdb.attach_postgres`
- `macro.trase_duckdb.detach_postgres`
No called script or script source not found.
import polars as pl
def model(dbt, session):
dbt.config(materialized="external")
# For some reason, currently models inside 'models/postgres_views/' can't also
# refer to existing models (in this case it would've been to ref('postgres_economic_blocs'))
relation = f"{dbt.this.identifier}_postgres_db.views.economic_blocs"
df = session.sql(
f"""
SELECT
country_trase_id,
country_name,
economic_bloc
FROM {relation}
"""
).pl()
json_schema = pl.Struct(
[
pl.Field("time_end", pl.Utf8),
pl.Field("time_start", pl.Utf8),
pl.Field("economic_bloc", pl.Utf8),
]
)
# economic_bloc is a list of JSON strings
# explode -> one row per JSON string
df_parsed = (
df.explode("economic_bloc")
.with_columns(
pl.col("economic_bloc")
.str.json_decode(json_schema)
.alias("economic_bloc_parsed")
)
.filter(
pl.col("economic_bloc_parsed").struct.field("economic_bloc")
== "EUROPEAN UNION"
)
.select(
"country_trase_id",
"country_name",
pl.col("economic_bloc_parsed")
.struct.field("time_start")
.str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S", strict=False)
.cast(pl.Date)
.alias("eu_date_start"),
pl.col("economic_bloc_parsed")
.struct.field("time_end")
.str.strptime(pl.Datetime, format="%Y-%m-%dT%H:%M:%S", strict=False)
.cast(pl.Date)
.alias("eu_date_end"),
)
)
return df_parsed