DBT: Weighted Mean Port
File location: s3://trase-storage/brazil/soy/auxiliary/fob/weighted_mean_port.csv
DBT model name: weighted_mean_port
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.weighted_mean_port -
Containing yaml link: trase/data_pipeline/models/brazil/soy/auxiliary/fob/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/soy/auxiliary/fob/weighted_mean_port.py
-
Calls script:
trase/data/brazil/soy/auxiliary/fob/weighted_mean_port.py -
Tags:
mock_model,auxiliary,brazil,fob,soy
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/soy/auxiliary/fob/weighted_mean_port.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Models / Seeds
model.trase_duckdb.brazil_mdic_port_2022model.trase_duckdb.brazil_mdic_port_2021model.trase_duckdb.brazil_mdic_port_2019model.trase_duckdb.brazil_mdic_port_2020
import numpy as np
from trase.tools import sps
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
HS4 = ["1201", "1208", "1507", "2304"]
MDIC_COLUMNS = {
"year": "YEAR",
"hs4": "PRODUCT",
"vol": "VOLUME_PRODUCT",
"fob": "FOB",
"via": "VIA",
"country_of_destination.name": "COUNTRY_OF_FIRST_IMPORT",
"state.uf_letter": "STATE",
"port.name": "PORT_OF_EXPORT",
}
# load 2019, 2020 and 2022 MDIC data for soy
df = sps.concat(
[
get_pandas_df_once(
f"brazil/trade/mdic/port/brazil_mdic_port_{year}.csv",
sep=";",
dtype=str,
keep_default_na=False,
)
for year in [2019, 2020, 2021, 2022]
]
)
df = df[df["hs4"].isin(HS4)]
df.loc[:, "via"] = np.where(df.via == "01", 1, 2) # maritime
# Select and rename columns
df = df[MDIC_COLUMNS].rename(columns=MDIC_COLUMNS, errors="raise")
# transform unit
df = df.astype({"VOLUME_PRODUCT": float, "FOB": float, "YEAR": int})
df["VOLUME_PRODUCT"] /= 1e3
# calculate average FOB
df = (
df.groupby(["YEAR", "PRODUCT", "VIA", "PORT_OF_EXPORT"])["VOLUME_PRODUCT", "FOB"]
.sum()
.reset_index()
)
df["AVG_FOB"] = df["FOB"] / df["VOLUME_PRODUCT"]
# fill in infinite values
df["AVG_FOB"] = df["AVG_FOB"].mask(df["VOLUME_PRODUCT"] < 1e-10, 0.01)
write_csv_for_upload(df, "brazil/soy/auxiliary/fob/weighted_mean_port.csv")
import pandas as pd
def model(dbt, cursor):
dbt.ref("brazil_mdic_port_2022")
dbt.ref("brazil_mdic_port_2021")
dbt.ref("brazil_mdic_port_2019")
dbt.ref("brazil_mdic_port_2020")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})