DBT: Brazil Bol 2022
File location: s3://trase-storage/brazil/trade/bol/2022/BRAZIL_BOL_2022.csv
DBT model name: brazil_bol_2022
Explore on Metabase: Full table; summary statistics
DBT details
- Lineage
-
Dbt path:
trase_production.main_brazil.brazil_bol_2022 -
Containing yaml link: trase/data_pipeline/models/brazil/trade/bol/2022/_schema.yml
-
Model file: trase/data_pipeline/models/brazil/trade/bol/2022/brazil_bol_2022.py
-
Calls script:
trase/data/brazil/trade/bol/2022/BRAZIL_BOL_2022.py -
Tags:
mock_model,2022,bol,brazil,trade
Description
This model was auto-generated based off .yml 'lineage' files in S3. The DBT model just raises an error; the actual script that created the data lives elsewhere. The script is located at trase/data/brazil/trade/bol/2022/BRAZIL_BOL_2022.py [permalink]. It was last run by Harry Biddle.
Details
| Column | Type | Description |
|---|---|---|
Dates Long Haul/YYYY |
VARCHAR |
|
Dates Long Haul/MM |
VARCHAR |
|
month |
VARCHAR |
|
Dates Long Haul/YYYYMMDD |
VARCHAR |
|
Commodity_HS_Datamar/HS4 English |
VARCHAR |
|
hs6_description |
VARCHAR |
|
hs8_description |
VARCHAR |
|
Commodity Detail/BL Description |
VARCHAR |
|
Identification/ID_Datamar |
VARCHAR |
|
Cargo_Transport/Cargo Type |
VARCHAR |
|
Cargo_Transport/Container Type |
VARCHAR |
|
Place_and_Ports/POR_Country |
VARCHAR |
|
Place_and_Ports/POMO_Name |
VARCHAR |
|
port_of_export.label |
VARCHAR |
|
country_of_destination.label |
VARCHAR |
|
port_of_import.label |
VARCHAR |
|
Place_and_Ports/POMD_Name |
VARCHAR |
|
exporter.label |
VARCHAR |
|
Company_Shipper/Shipper Name Detailed |
VARCHAR |
|
Company_Shipper/Type |
VARCHAR |
|
exporter.cnpj |
VARCHAR |
|
exporter.country.label |
VARCHAR |
|
exporter.state.label |
VARCHAR |
|
exporter.municipality.label |
VARCHAR |
|
Company_Shipper/Neighborhood |
VARCHAR |
|
Company_Shipper/Street |
VARCHAR |
|
Company_Shipper/Zip |
VARCHAR |
|
Company_Forwarder/Forwarder Name |
VARCHAR |
|
Company_Notify/Notify Name |
VARCHAR |
|
importer.label |
VARCHAR |
|
Company_Consignee/Consignee Name Detailed |
VARCHAR |
|
Company_Consignee/Type |
VARCHAR |
|
Company_Consignee/Registration Number |
VARCHAR |
|
Company_Consignee/Country |
VARCHAR |
|
Company_Consignee/State Name |
VARCHAR |
|
Company_Consignee/City |
VARCHAR |
|
Company_Consignee/Neighborhood |
VARCHAR |
|
Company_Consignee/Street |
VARCHAR |
|
Company_Consignee/Zip |
VARCHAR |
|
Vessel Long Haul/Vessel Name Long Haul |
VARCHAR |
|
Vessel Long Haul/Vessel Type Long Haul |
VARCHAR |
|
Service/Service Name |
VARCHAR |
|
Service/Transit Time |
VARCHAR |
|
Service/Port Rotation |
VARCHAR |
|
Carrier/Carrier Name |
VARCHAR |
|
Carrier/Carrier Group Name |
VARCHAR |
|
Carrier/Carrier Group SCAC |
VARCHAR |
|
Carrier/Carrier Agent |
VARCHAR |
|
TEU |
VARCHAR |
|
vol |
VARCHAR |
|
WTMT |
VARCHAR |
|
C20 |
VARCHAR |
|
C40 |
VARCHAR |
|
year |
VARCHAR |
|
hs6 |
VARCHAR |
|
hs8 |
VARCHAR |
|
hs4 |
VARCHAR |
|
hs5 |
VARCHAR |
|
exporter.type |
VARCHAR |
|
exporter.state.name |
VARCHAR |
|
exporter.state.trase_id |
VARCHAR |
|
port_of_export.name |
VARCHAR |
|
exporter.municipality.name |
VARCHAR |
|
exporter.municipality.trase_id |
VARCHAR |
|
country_of_destination.name |
VARCHAR |
|
country_of_destination.trase_id |
VARCHAR |
|
country_of_destination.economic_bloc |
VARCHAR |
|
importer.trader_id |
VARCHAR |
|
importer.name |
VARCHAR |
|
importer.group |
VARCHAR |
|
exporter.trase_id |
VARCHAR |
|
exporter.trader_id |
VARCHAR |
|
exporter.group |
VARCHAR |
|
exporter.name |
VARCHAR |
Models / Seeds
source.trase_duckdb.trase-storage-raw.dataliner_report_stockholm_exp_br_2022_with_cnpjmodel.trase_duckdb.hs2017
Sources
['trase-storage-raw', 'dataliner_report_stockholm_exp_br_2022_with_cnpj']
import numpy as np
import pandas as pd
import stdnum.br.cnpj
import stdnum.br.cpf
from psycopg2 import sql
from trase.tools import (
find_label,
get_country_id,
get_label_trader_id,
get_node_name,
get_trader_group_id,
uses_database,
)
from trase.tools.aws.aws_helpers_cached import get_pandas_df_once
from trase.tools.aws.metadata import write_csv_for_upload
from trase.tools.pandasdb.find import (
find_default_name_by_node_id,
find_economic_blocs_by_trase_id,
find_traders_and_groups_by_label,
find_traders_and_groups_by_trase_id,
)
from trase.tools.utilities.helpers import clean_string
YEAR = 2022
MISSING_VALUES = ["NAN", "NONE", "NA", ""]
def load_and_rename_data():
df = get_pandas_df_once(
"brazil/trade/bol/2022/originals/DataLiner_Report_STOCKHOLM_EXP_BR_2022_with_cnpj.xlsx",
encoding="utf8",
sep=";",
dtype=str,
keep_default_na=False,
xlsx=True,
)
# set the first row as header
df.columns = df.iloc[0]
df = df.drop(df.index[0])
# rename the columns
columns = {
"Dates Long Haul/YYYYMM": "month",
# hs code
"Commodity_HS_Datamar/HS6 English": "hs6_description",
"Commodity_HS_Datamar/HS8 Portugues": "hs8_description",
# exporter
"Company_Shipper/City": "exporter.municipality.label", # seems to be municipality...
"Company_Shipper/Registration Number": "exporter.cnpj",
"Company_Shipper/Shipper Name": "exporter.label",
"Company_Shipper/State Name": "exporter.state.label",
"Company_Shipper/Country Name": "exporter.country.label",
# ports, country
"Place_and_Ports/POL_Name": "port_of_export.label",
"Place_and_Ports/POD_Name": "port_of_import.label",
"Place_and_Ports/DEST_Country": "country_of_destination.label",
# importer
"Company_Consignee/Consignee Name": "importer.label",
# volume, fob
"WTKG": "vol",
}
df = df.rename(columns=columns, errors="raise")
return df
def clean_time(df):
"""Parse time and do some basic checks"""
assert (
sum(df["month"].str.len() != 6) == 0
), "Column 'Period/YYYYMM' should only contain six digits."
df["year"] = df["month"].str[:4]
df["month"] = df["month"].str[-2:]
assert sum(df["year"] != str(YEAR)) == 0, f"Year has to be {YEAR}."
assert (
df[(df["month"].astype(int) > 12) & (df["month"].astype(int) < 1)].shape[0] == 0
), f"Year has to be {YEAR}."
return df
def clean_hs(df):
"""Do some basic checks of hs codes, and create hs4 and hs5 based on hs6."""
# get hs codes from descriptions
df["hs6"] = df["hs6_description"].str[:6]
df["hs8"] = df["hs8_description"].str[:8]
# check the basic format of the hs code columns
assert (
sum(df["hs6"].str.len() != 6) == 0
), "Column 'Commodity_HS_Datamar/HS6 Code' should contain 6 digits."
assert (
sum(df["hs8"].str.len() != 8) == 0
), "Column 'Commodity_HS_Datamar/HS8 Code' should contain 6 digits."
assert (
df[~df["hs6"].str.isdigit()].shape[0] == 0
), "Column 'Commodity_HS_Datamar/HS6 Code' should only contain digits."
assert (
df[~df["hs8"].str.isdigit()].shape[0] == 0
), "Column 'Commodity_HS_Datamar/HS8 Code' should only contain digits."
assert (
sum(df["hs6"] != df["hs8"].str[:6]) == 0
), "Column 'Commodity_HS_Datamar/HS6 Code' does not match 'Commodity_HS_Datamar/HS8 Code'."
df["hs4"] = df["hs6"].str.slice(0, 4)
df["hs5"] = df["hs6"].str.slice(0, 5)
# check whether the hs4 codes already exist in our dict
df_hscodes = get_pandas_df_once(
"world/metadata/codes/hs/HS2017.csv", sep=";", dtype=str, keep_default_na=False
)
hs4_list = df_hscodes[df_hscodes["type"] == "hs4"]["code"].to_list()
df = df[
df["hs4"].isin(hs4_list)
] # filter out the rows without valid hs4 in our dict, TODO: check whether it is the correct way to do this? do we need to check all the hs4/hs6 codes not included in the 2017 file
return df
def clean_string_columns(df, column_list):
# clean the string columns
for column in column_list:
df[column] = df[column].apply(clean_string)
# replace null values to UNKNOWN
for column in df.columns:
df.loc[df[column].isin(MISSING_VALUES), column] = "UNKNOWN"
return df
def clean_cnpjs(df):
"""Clean cnpjs and create a column 'exporter.type' indicating cnpj or cpf."""
assert (
df[~df["exporter.cnpj"].str.isdigit()].shape[0] == 0
), "Column 'Company_Shipper/Registration Number' should only contain digits."
cnpj = df["exporter.cnpj"].str.rjust(14, "0")
cnpj_valid = cnpj.apply(stdnum.br.cnpj.is_valid)
cpf = df["exporter.cnpj"].str.rjust(11, "0")
cpf_valid = cpf.apply(stdnum.br.cpf.is_valid)
# cnpj_valid[cpf.isin(KNOWN_CPFS)] = False
assert not any(cnpj_valid & cpf_valid)
df["exporter.type"] = "unknown"
df.loc[cnpj_valid, "exporter.type"] = "cnpj"
df.loc[cpf_valid, "exporter.type"] = "cpf"
df["exporter.cnpj"] = np.where(cnpj_valid, cnpj, df["exporter.cnpj"])
df["exporter.cnpj"] = np.where(cpf_valid, cpf, df["exporter.cnpj"])
df.loc[df["exporter.cnpj"] == "0", "exporter.cnpj"] = "0" * 14
return df
@uses_database
def get_country_labels(cnx=None):
"""Retrieve country name, label, and trase_id"""
df = pd.read_sql(
"""
select distinct
name as "country_of_destination.name",
unnest(synonyms) as "country_of_destination.label",
coalesce(trase_id, 'XX') AS "country_of_destination.trase_id"
from views.regions where level = 1 and length(trase_id) = 2
""",
cnx.cnx,
)
df_new_synonyms = pd.DataFrame(
[
["SAINT KITTS AND NEVIS", "ST CHRISTOPHER AND NEVIS", "KN"],
],
columns=[
"country_of_destination.name",
"country_of_destination.label",
"country_of_destination.trase_id",
],
)
df = df.append(
df_new_synonyms,
ignore_index=True,
verify_integrity=True,
)
# add economic bloc
df[["country_of_destination.economic_bloc"]] = find_economic_blocs_by_trase_id(
df.rename(columns={"country_of_destination.trase_id": "trase_id"})[
["trase_id"]
],
returning=["economic_bloc_name"],
)
assert not any(df["country_of_destination.economic_bloc"].isna())
assert all(df["country_of_destination.economic_bloc"].str.len() > 3)
return df
def assert_none_missing(df, column):
missing = df[df.pop("_merge") != "both"][column].drop_duplicates()
assert missing.empty, f"Not all {column} found:\n{missing}"
def clean_countries(df):
"""Introduce country name and trase id to the dataframe"""
df = pd.merge(
df,
get_country_labels(),
on="country_of_destination.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "country_of_destination.name")
return df
@uses_database
def get_state_labels(cnx=None):
"""Retrieve state name, label, and trase_id"""
df = pd.read_sql(
"""
select distinct
name as "exporter.state.name",
unnest(synonyms) as "exporter.state.label",
coalesce(trase_id, 'BR-XX') AS "exporter.state.trase_id"
from views.regions where level = 3 and length(trase_id) = 5 and country = 'BRAZIL'
""",
cnx.cnx,
)
return df
def clean_states(df):
"""Introduce state name and trase id to the dataframe"""
# correct wrong states for certain municipalities
municipality_state_dict = {
"BUENOS AIRES": "PERNAMBUCO",
"ALFENAS": "MINAS GERAIS",
}
for municipality, state in municipality_state_dict.items():
df.loc[
df["exporter.municipality.label"] == municipality, "exporter.state.label"
] = state
df = pd.merge(
df,
get_state_labels(),
on="exporter.state.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "exporter.state.name")
return df
@uses_database
def get_port_labels(cnx=None):
df = pd.read_sql(
"""
select distinct
name as "port_of_export.name",
unnest(synonyms) as "port_of_export.label"
from views.regions where region_type = 'PORT' and country = 'BRAZIL'
""",
cnx.cnx,
)
df_new_synonyms = pd.DataFrame(
[
["TROMBETAS", "TROMBETAS"],
["ALUMAR", "ALUMAR"],
["PONTA UBU", "PONTA UBU"],
["UNKNOWN", "PLACE_AND_PORTS/POL_NAME"],
["JURUTI", "JURUTI"],
["FLUMINENSE TERMINAL PORT", "FLUMINENSE TERMINAL PORT"],
["FLUMINENSE TERMINAL PORT", "BIJUPIRA SALEMA FIELD"],
],
columns=["port_of_export.name", "port_of_export.label"],
)
df = df.append(
df_new_synonyms,
ignore_index=True,
verify_integrity=True,
)
return df
def clean_ports(df):
"""Introduce port name and trase id to the dataframe"""
df = pd.merge(
df,
get_port_labels(),
on="port_of_export.label",
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df, "port_of_export.name")
return df
@uses_database
def get_municipality_labels(cnx=None):
df_municipalities = pd.read_sql(
f"""
select distinct
name as "exporter.municipality.name",
unnest(synonyms) as "exporter.municipality.label",
trase_id as "exporter.municipality.trase_id",
substr(trase_id, 0, 6) as "exporter.state.trase_id"
from views.regions
where country = 'BRAZIL' and region_type = 'MUNICIPALITY'
""",
cnx.cnx,
)
return df_municipalities
def clean_municipalities(df):
"""Introduce municipality name and trase id to the dataframe"""
# correct some exporter countries
municipality_external = {"ROTTERDAM": "NETHERLANDS"}
for municipality, country in municipality_external.items():
df.loc[
df["exporter.municipality.label"] == municipality, "exporter.country.label"
] = country
# correct some synonyms of municipalities
municipality_synonyms = {
"ESTRELA D OESTE": "ESTRELA D'OESTE",
"CAPAO GRANDE": "VARZEA GRANDE", # CAPAO GRANDE belongs to VARZEA GRANDE municipality
"GOVERNADOR DIX SEPT ROSADO": "GOVERNADOR DIX-SEPT ROSADO",
"DIAS D AVILA": "DIAS D'AVILA",
"IDROLANDIA": "SIDROLANDIA",
"JIPARANA": "JI-PARANA",
}
for synonym, municipality in municipality_synonyms.items():
df.loc[
df["exporter.municipality.label"] == synonym, "exporter.municipality.label"
] = municipality
# BARUERI and ESTRELA D'OESTE both belong to SAO PAULO
state_dict = {
"exporter.state.label": "SAO PAULO",
"exporter.state.name": "SAO PAULO",
"exporter.state.trase_id": "BR-35",
}
for m in ["BARUERI", "ESTRELA D'OESTE"]:
for c, v in state_dict.items():
df.loc[df["exporter.municipality.label"] == m, c] = v
# split df based on the exporter countries
df_brazil = df[df["exporter.country.label"] == "BRAZIL"]
df_no_brazil = df[df["exporter.country.label"] != "BRAZIL"]
# merge with municipality information in DB
df_brazil = pd.merge(
df_brazil,
get_municipality_labels(),
on=["exporter.municipality.label", "exporter.state.trase_id"],
validate="many_to_one",
how="left",
indicator=True,
)
assert_none_missing(df_brazil, "exporter.municipality.name")
# replace municipality names and trase ids of foreign exporters to unknown
df_no_brazil["exporter.municipality.name"] = "UNKNOWN MUNICIPALITY"
df_no_brazil["exporter.municipality.trase_id"] = "BR-XXXXXXX"
df = pd.concat([df_brazil, df_no_brazil])
return df
def check_numerical(df, columns):
for c in columns:
num_array = df[c].str.lstrip(".").copy()
assert (
num_array.apply(pd.to_numeric, errors="coerce").notnull().all()
), f"Column {c} contains non-numerical value."
@uses_database
def clean_importers(df, cur=None, cnx=None):
df_importers = df[["importer.label"]].drop_duplicates()
# clean importer names
df_importers[["importer.trader_id", "importer.name", "importer.group", "count"]] = (
find_traders_and_groups_by_label(
df_importers.rename(columns={"importer.label": "trader_label"}),
returning=["trader_id", "trader_name", "group_name", "count"],
year=sql.Literal(YEAR),
cur=cur,
cnx=cnx,
)
)
# special case for UNKNOWN CUSTOMER (there are two!)
is_unknown = (df_importers["count"] != 1) & (
df_importers["importer.label"] == "UNKNOWN CUSTOMER"
)
if any(is_unknown):
brazil_id = get_country_id("BRAZIL", cur=cur)
label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
trader_id = get_label_trader_id(label_id, brazil_id)
trader_name = get_node_name(trader_id, cur=cur)
group_id = get_trader_group_id(trader_id, cur=cur)
group_name = get_node_name(group_id, cur=cur)
df_importers.loc[is_unknown, "importer.trader_id"] = trader_id
df_importers.loc[is_unknown, "importer.name"] = trader_name
df_importers.loc[is_unknown, "importer.group"] = group_name
df_importers.loc[is_unknown, "count"] = 1
# we should have found one unique node for every importer
bad = df_importers.pop("count") != 1
if any(bad):
raise ValueError(f"Missing some importers:\n{df_importers[bad]}")
# merge back into result
df = pd.merge(
df,
df_importers,
on=["importer.label"],
how="left",
validate="many_to_one",
indicator=True,
)
merge = df.pop("_merge")
assert all(merge == "both")
return df
@uses_database
def clean_exporters_and_add_group(df, cur=None, cnx=None):
"""
This function adds two columns:
exporter.name - the default name of the exporter from the database
exporter.group - the group name from the database
It does this using the following algorithm:
1. Construct a Trase ID from exporter.cnpj and use this to perform a lookup in the
database
2. If a unique name + group cannot be found through that method, use exporter.label
to perform a lookup among trader labels in the database
TODO: try to do this more concisely / in fewer lines of code
"""
trase_ids = "BR-TRADER-" + df["exporter.cnpj"].str.slice(0, 8)
trase_ids = trase_ids.replace({"BR-TRADER-00000000": None})
df = df.assign(**{"exporter.trase_id": trase_ids})
df_exporters = df[["exporter.label", "exporter.trase_id"]].drop_duplicates()
# clean exporter names using trase id
df_exporters[["exporter.trader_id", "exporter.group", "count"]] = (
find_traders_and_groups_by_trase_id(
df_exporters.rename(columns={"exporter.trase_id": "trase_id"})[
["trase_id"]
],
returning=["trader_id", "group_name", "count"],
year=sql.Literal(YEAR),
cur=cur,
cnx=cnx,
)
)
counts = df_exporters.pop("count")
assert all(counts.isin([0, 1]))
not_found_by_trase_id = counts == 0
print(
f"{sum(~not_found_by_trase_id)} exporters were found by Trase ID and "
f"{sum(not_found_by_trase_id)} were not"
)
df_found_by_trase_id = df_exporters[~not_found_by_trase_id]
df_missing = df_exporters[not_found_by_trase_id].copy()
# if not found by Trase ID, then look by name
labels = df_missing["exporter.label"].drop_duplicates()
df_labels = pd.DataFrame(labels)
df_labels[["exporter.trader_id", "exporter.group", "count"]] = (
find_traders_and_groups_by_label(
df_labels.rename(columns={"exporter.label": "trader_label"}),
returning=["trader_id", "group_name", "count"],
year=sql.Literal(YEAR),
)
)
# special case for UNKNOWN CUSTOMER
is_unknown = (df_labels["count"] != 1) & (
df_labels["exporter.label"] == "UNKNOWN CUSTOMER"
)
if any(is_unknown):
brazil_id = get_country_id("BRAZIL", cur=cur)
label_id = find_label("UNKNOWN CUSTOMER", cur=cur)
trader_id = get_label_trader_id(label_id, brazil_id)
group_id = get_trader_group_id(trader_id, cur=cur)
group_name = get_node_name(group_id, cur=cur)
df_labels.loc[is_unknown, "exporter.trader_id"] = trader_id
df_labels.loc[is_unknown, "exporter.group"] = group_name
df_labels.loc[is_unknown, "count"] = 1
# we should have found one unique node for every exporter
bad = df_labels.pop("count") != 1
if any(bad):
raise ValueError(f"Missing some exporters:\n{df_labels[bad]}")
# merge exporters found by trase id back into results
right = df_found_by_trase_id[
["exporter.trase_id", "exporter.trader_id", "exporter.group"]
].drop_duplicates()
df1 = pd.merge(
df,
right,
on=["exporter.trase_id"],
how="left",
validate="many_to_one",
indicator=True,
)
merge = df1.pop("_merge")
df_solved1 = df1[merge == "both"]
# merge exporters found by label back into results
df_unsolved = df1[merge != "both"]
df_unsolved = df_unsolved.drop(
columns=["exporter.trader_id", "exporter.group"], errors="raise"
)
right = df_labels[
["exporter.label", "exporter.trader_id", "exporter.group"]
].drop_duplicates()
df_solved2 = pd.merge(
df_unsolved,
right,
on=["exporter.label"],
how="left",
validate="many_to_one",
indicator=True,
)
merge = df_solved2.pop("_merge")
assert all(merge == "both")
# combine the two
expected_columns = list(set(df.columns) | {"exporter.trader_id", "exporter.group"})
assert sorted(df_solved2.columns) == sorted(expected_columns)
assert sorted(df_solved1.columns) == sorted(expected_columns)
df_final = pd.concat([df_solved1, df_solved2]).reset_index(drop=True)
# guarantee that we didn't change the original data
a = df.sort_values(list(df.columns)).reset_index(drop=True)
b = df_final[df.columns].sort_values(list(df.columns)).reset_index(drop=True)
b.columns.name = a.columns.name # needed for assert equal but don't know what it is
pd.testing.assert_frame_equal(a, b)
# add exporter names
df_final = df_final.astype({"exporter.trader_id": int})
df_final[["exporter.name"]] = find_default_name_by_node_id(
df_final[["exporter.trader_id"]].rename(
columns={"exporter.trader_id": "node_id"}
),
returning=["name"],
cnx=cnx,
cur=cur,
)
return df_final
def main():
df = load_and_rename_data()
# clean time, hs codes, and cnpjs
df = clean_time(df)
df = clean_hs(df)
df = clean_cnpjs(df)
# clean string columns
string_columns = [
c for c in df.columns.to_list() if c not in ["vol", "WTMT", "fob"]
]
df = clean_string_columns(df, string_columns)
df = clean_states(df)
df = clean_ports(df)
df = clean_municipalities(df)
df = clean_countries(df)
df = clean_importers(df)
df = clean_exporters_and_add_group(df)
# check numerical columns
num_column_list = ["vol", "WTMT"]
check_numerical(df, num_column_list)
# save to csv
write_csv_for_upload(df, "brazil/trade/bol/2022/BRAZIL_BOL_2022.csv")
if __name__ == "__main__":
main()
import pandas as pd
def model(dbt, cursor):
dbt.ref("hs2017")
dbt.source("trase-storage-raw", "dataliner_report_stockholm_exp_br_2022_with_cnpj")
raise NotImplementedError()
return pd.DataFrame({"hello": ["world"]})