AWS SDK for pandas

17 - Partition Projection

https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html

[1]:
import getpass
from datetime import datetime

import pandas as pd

import awswrangler as wr

Enter your bucket name:

[2]:
bucket = getpass.getpass()
 ···········································

Integer projection

[3]:
df = pd.DataFrame({"value": [1, 2, 3], "year": [2019, 2020, 2021], "month": [10, 11, 12], "day": [25, 26, 27]})

df
[3]:
value year month day
0 1 2019 10 25
1 2 2020 11 26
2 3 2021 12 27
[4]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_integer/",
    dataset=True,
    partition_cols=["year", "month", "day"],
    database="default",
    table="table_integer",
    athena_partition_projection_settings={
        "projection_types": {"year": "integer", "month": "integer", "day": "integer"},
        "projection_ranges": {"year": "2000,2025", "month": "1,12", "day": "1,31"},
    },
)
[5]:
wr.athena.read_sql_query("SELECT * FROM table_integer", database="default")
[5]:
value year month day
0 3 2021 12 27
1 2 2020 11 26
2 1 2019 10 25

Enum projection

[6]:
df = pd.DataFrame(
    {
        "value": [1, 2, 3],
        "city": ["São Paulo", "Tokio", "Seattle"],
    }
)

df
[6]:
value city
0 1 São Paulo
1 2 Tokio
2 3 Seattle
[7]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_enum/",
    dataset=True,
    partition_cols=["city"],
    database="default",
    table="table_enum",
    athena_partition_projection_settings={
        "projection_types": {
            "city": "enum",
        },
        "projection_values": {"city": "São Paulo,Tokio,Seattle"},
    },
)
[8]:
wr.athena.read_sql_query("SELECT * FROM table_enum", database="default")
[8]:
value city
0 1 São Paulo
1 3 Seattle
2 2 Tokio

Date projection

[9]:
def ts(x):
    return datetime.strptime(x, "%Y-%m-%d %H:%M:%S")


def dt(x):
    return datetime.strptime(x, "%Y-%m-%d").date()


df = pd.DataFrame(
    {
        "value": [1, 2, 3],
        "dt": [dt("2020-01-01"), dt("2020-01-02"), dt("2020-01-03")],
        "ts": [ts("2020-01-01 00:00:00"), ts("2020-01-01 00:00:01"), ts("2020-01-01 00:00:02")],
    }
)

df
[9]:
value dt ts
0 1 2020-01-01 2020-01-01 00:00:00
1 2 2020-01-02 2020-01-01 00:00:01
2 3 2020-01-03 2020-01-01 00:00:02
[10]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_date/",
    dataset=True,
    partition_cols=["dt", "ts"],
    database="default",
    table="table_date",
    athena_partition_projection_settings={
        "projection_types": {
            "dt": "date",
            "ts": "date",
        },
        "projection_ranges": {"dt": "2020-01-01,2020-01-03", "ts": "2020-01-01 00:00:00,2020-01-01 00:00:02"},
    },
)
[11]:
wr.athena.read_sql_query("SELECT * FROM table_date", database="default")
[11]:
value dt ts
0 1 2020-01-01 2020-01-01 00:00:00
1 2 2020-01-02 2020-01-01 00:00:01
2 3 2020-01-03 2020-01-01 00:00:02

Injected projection

[12]:
df = pd.DataFrame(
    {
        "value": [1, 2, 3],
        "uuid": [
            "761e2488-a078-11ea-bb37-0242ac130002",
            "b89ed095-8179-4635-9537-88592c0f6bc3",
            "87adc586-ce88-4f0a-b1c8-bf8e00d32249",
        ],
    }
)

df
[12]:
value uuid
0 1 761e2488-a078-11ea-bb37-0242ac130002
1 2 b89ed095-8179-4635-9537-88592c0f6bc3
2 3 87adc586-ce88-4f0a-b1c8-bf8e00d32249
[13]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_injected/",
    dataset=True,
    partition_cols=["uuid"],
    database="default",
    table="table_injected",
    athena_partition_projection_settings={
        "projection_types": {
            "uuid": "injected",
        }
    },
)
[14]:
wr.athena.read_sql_query(
    sql="SELECT * FROM table_injected WHERE uuid='b89ed095-8179-4635-9537-88592c0f6bc3'", database="default"
)
[14]:
value uuid
0 2 b89ed095-8179-4635-9537-88592c0f6bc3

Cleaning Up

[15]:
wr.s3.delete_objects(f"s3://{bucket}/table_integer/")
wr.s3.delete_objects(f"s3://{bucket}/table_enum/")
wr.s3.delete_objects(f"s3://{bucket}/table_date/")
wr.s3.delete_objects(f"s3://{bucket}/table_injected/")
[16]:
wr.catalog.delete_table_if_exists(table="table_integer", database="default")
wr.catalog.delete_table_if_exists(table="table_enum", database="default")
wr.catalog.delete_table_if_exists(table="table_date", database="default")
wr.catalog.delete_table_if_exists(table="table_injected", database="default")
[ ]: