Skip to content

Brazil Soy Silo Map V2 Csv

s3://trase-storage/brazil/logistics/silos/silo_map_v2/brazil_soy_silo_map_v2.csv

Dbt path: trase_production.main_brazil.brazil_soy_silo_map_v2_csv

Explore on Metabase: Full table; summary statistics

Containing yaml file link: trase/data_pipeline/models/brazil/logistics/silos/silo_map_v2/_schema.yml

Model file link: trase/data_pipeline/models/brazil/logistics/silos/silo_map_v2/brazil_soy_silo_map_v2_csv.py

Dbt test runs & lineage: Test results ยท Lineage

Full dbt_docs page: Open in dbt docs (includes lineage graph -at the bottom right-, tests, and downstream dependencies)

Tags: mock_model


brazil_soy_silo_map_v2_csv

Description

An enriched and CSV version of the GeoJSON data for publication on the trase.earth website


Details

Column Type Description
municipality_ibge_code BIGINT
capacity_tn DOUBLE
sicarm_cda VARCHAR
lon DOUBLE
lat DOUBLE
name VARCHAR
source VARCHAR
type VARCHAR
municipality_name VARCHAR
state_ibge_code BIGINT
state_name VARCHAR
state_abbreviation VARCHAR
row_index BIGINT Zero-index row number. This is used just to provide a primary key for functionality of the logistics map; it is not a stable unique identifier for the facility.

Models / Seeds

  • source.trase_duckdb.source_brazil.brazil_metadata_municipalities_ibge
  • source.trase_duckdb.source_brazil.brazil_metadata_states_ibge
  • model.trase_duckdb.brazil_logistics_silo_map_v2_final

Sources

  • ['source_brazil', 'brazil_metadata_municipalities_ibge']
  • ['source_brazil', 'brazil_metadata_states_ibge']

No called script or script source not found.

import polars as pl


def model(dbt, session):
    dbt.config(materialized="external")

    dbt.ref("brazil_logistics_silo_map_v2_final")
    df = session.sql(
        """
        WITH expanded AS (
            SELECT UNNEST(features) AS f
            FROM read_json(
                's3://trase-storage/brazil/logistics/silos/silo_map_v2/silo_map_v2_final.geojson',
                columns={'features': 'JSON[]'}
            )
        )
        SELECT
            f -> 'properties' ->> 'source'                                   AS source
            , CAST(f -> 'properties' ->> 'code_muni' AS INT)                   AS municipality_ibge_code
            , CAST(f -> 'properties' ->> 'soy_intersection' AS boolean)        AS soy_intersection
            , CAST(f -> 'properties' ->> 'id' AS BIGINT)                       AS id
            , CAST(f -> 'properties' ->> 'cluster_id' AS bigint)               AS cluster_id
            , CAST(NULLIF(f -> 'properties' ->> 'capacity', 'null') AS DOUBLE) AS capacity_tn
            , f -> 'properties' ->> 'sicarm_cda'                               AS sicarm_cda
            , CAST(f -> 'properties' ->> 'lng' AS DOUBLE)                      AS lon
            , CAST(f -> 'properties' ->> 'lat' AS DOUBLE)                      AS lat
            , f -> 'properties' ->> 'validation_parallel'                      AS validation_parallel
            , f -> 'properties' ->> 'type'                                     AS type
            , f -> 'properties' ->> 'local_facility'                           AS local_facility
            , f -> 'properties' ->> 'name'                                     AS name
        FROM expanded
        """
    ).pl()

    # select and transform columns
    # note that CNPJ/CPF is intentionally excluded due to the risk of GDPR violations
    df = df.select(
        #
        # remove columns that we want to hide from the user or that are cleaned
        # lower down
        #
        pl.exclude(
            "source",
            "validation_parallel",  # I think this was just for internal purposes
            "id",  # The cluster ID and the IDs are duplicated, as you can have different facilities with different capabilities under the same name (CNPJ) and coordinates
            "cluster_id",  # The cluster ID and the IDs are duplicated, as you can have different facilities with different capabilities under the same name (CNPJ) and coordinates
            "soy_intersection",  # A flag (true/false) indicating if the facility is located within a 10km radius of a known soy production. always true in the output data. it was used to filter relevant sicarm facilities, should be in the method document, but field is not useful
            "local_facility",  # field was just created for the Ode project
            "type",  # the urban and not urban is quite arbitrary and dependent on the zoom level, not really useful for the user
        ),
        #
        # Nicer formatting for the "source" column
        #
        source=pl.col("source").replace(
            {
                "gmaps": "Google Maps",
                "sicarm": "SICARM (CONAB)",
                "cnpj": "CNPJ (Receita Federal)",
            }
        ),
        #
        # Nicer formatting for the "type" column
        #
        type=pl.col("type").str.strip_chars().str.to_titlecase(),
    )

    # replace municipality names with properly-cased and accented versions
    df_municipalities = (
        dbt.source("source_brazil", "brazil_metadata_municipalities_ibge")
        .select("id", "nome")
        .pl()
        .rename(
            {
                "id": "municipality_ibge_code",
                "nome": "municipality_name",
            }
        )
    )
    df = df.join(
        df_municipalities, on="municipality_ibge_code", how="left", validate="m:1"
    )

    # add state info
    df_states = (
        dbt.source("source_brazil", "brazil_metadata_states_ibge")
        .select("id", "nome", "sigla")
        .pl()
        .rename(
            {
                "id": "state_ibge_code",
                "nome": "state_name",
                "sigla": "state_abbreviation",
            }
        )
    )
    df = df.with_columns(
        state_ibge_code=pl.col("municipality_ibge_code")
        .cast(str)
        .str.slice(0, 2)
        .cast(int),
    ).join(df_states, on="state_ibge_code", how="left", validate="m:1")

    # add a primary key to the data
    df = df.with_columns(row_index=pl.arange(0, len(df)))

    return df