AWS SDK for pandas

19 - Amazon Athena Cache

awswrangler has a cache strategy that is disabled by default and can be enabled by passing max_cache_seconds bigger than 0 as part of the athena_cache_settings parameter. This cache strategy for Amazon Athena can help you to decrease query times and costs.

When calling read_sql_query, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within max_cache_seconds (a new parameter to read_sql_query), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.

The detailed approach is: - When read_sql_query is called with max_cache_seconds > 0 (it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination). - By default it will check the last 50 queries, but you can customize it through the max_cache_query_inspections argument. - We then sort those queries based on CompletionDateTime, descending - For each of those queries, we check if their CompletionDateTime is still within the max_cache_seconds window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both ctas_approaches). If they are the same, we check if the last one’s results are still on S3, and then return them instead of re-running the query. - During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual read_sql_query path.

P.S. The ``cache scope is bounded for the current workgroup``, so you will be able to reuse queries results from others colleagues running in the same environment.

[18]:
import awswrangler as wr

Enter your bucket name:

[19]:
import getpass

bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

Checking/Creating Glue Catalog Databases

[20]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

Creating a Parquet Table from the NOAA’s CSV files

Reference

[21]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(path="s3://noaa-ghcn-pds/csv/by_year/1865.csv", names=cols, parse_dates=["dt", "obs_time"])

df
[21]:
id dt element value m_flag q_flag s_flag obs_time
0 ID DATE ELEMENT DATA_VALUE M_FLAG Q_FLAG S_FLAG OBS_TIME
1 AGE00135039 18650101 PRCP 0 NaN NaN E NaN
2 ASN00019036 18650101 PRCP 0 NaN NaN a NaN
3 ASN00021001 18650101 PRCP 0 NaN NaN a NaN
4 ASN00021010 18650101 PRCP 0 NaN NaN a NaN
... ... ... ... ... ... ... ... ...
37918 USC00288878 18651231 TMIN -44 NaN NaN 6 NaN
37919 USC00288878 18651231 PRCP 0 P NaN 6 NaN
37920 USC00288878 18651231 SNOW 0 P NaN 6 NaN
37921 USC00361920 18651231 PRCP 0 NaN NaN F NaN
37922 USP00CA0001 18651231 PRCP 0 NaN NaN F NaN

37923 rows × 8 columns

[ ]:
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", database="awswrangler_test", table="noaa")
[23]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[23]:
Column Name Type Partition Comment
0 id string False
1 dt string False
2 element string False
3 value string False
4 m_flag string False
5 q_flag string False
6 s_flag string False
7 obs_time string False

The test query

The more computational resources the query needs, the more the cache will help you. That’s why we’re doing it using this long running query.

[24]:
query = """
SELECT
    n1.element,
    count(1) as cnt
FROM
    noaa n1
JOIN
    noaa n2
ON
    n1.id = n2.id
GROUP BY
    n1.element
"""

First execution…

[25]:
%%time

wr.athena.read_sql_query(query, database="awswrangler_test")
CPU times: user 1.59 s, sys: 166 ms, total: 1.75 s
Wall time: 5.62 s
[25]:
element cnt
0 PRCP 12044499
1 MDTX 1460
2 DATX 1460
3 ELEMENT 1
4 WT01 22260
5 WT03 840
6 DATN 1460
7 DWPR 490
8 TMIN 7012479
9 MDTN 1460
10 MDPR 2683
11 SNOW 1086762
12 DAPR 1330
13 SNWD 783532
14 TMAX 6533103

Second execution with CACHE (400x faster)

[26]:
%%time

wr.athena.read_sql_query(query, database="awswrangler_test", athena_cache_settings={"max_cache_seconds": 900})
CPU times: user 689 ms, sys: 68.1 ms, total: 757 ms
Wall time: 1.11 s
[26]:
element cnt
0 PRCP 12044499
1 MDTX 1460
2 DATX 1460
3 ELEMENT 1
4 WT01 22260
5 WT03 840
6 DATN 1460
7 DWPR 490
8 TMIN 7012479
9 MDTN 1460
10 MDPR 2683
11 SNOW 1086762
12 DAPR 1330
13 SNWD 783532
14 TMAX 6533103

Allowing awswrangler to inspect up to 500 historical queries to find same result to reuse.

[27]:
%%time

wr.athena.read_sql_query(
    query,
    database="awswrangler_test",
    athena_cache_settings={"max_cache_seconds": 900, "max_cache_query_inspections": 500},
)
CPU times: user 715 ms, sys: 44.9 ms, total: 760 ms
Wall time: 1.03 s
[27]:
element cnt
0 PRCP 12044499
1 MDTX 1460
2 DATX 1460
3 ELEMENT 1
4 WT01 22260
5 WT03 840
6 DATN 1460
7 DWPR 490
8 TMIN 7012479
9 MDTN 1460
10 MDPR 2683
11 SNOW 1086762
12 DAPR 1330
13 SNWD 783532
14 TMAX 6533103

Cleaning Up S3

[28]:
wr.s3.delete_objects(path)

Delete table

[29]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
[29]:
True

Delete Database

[30]:
wr.catalog.delete_database("awswrangler_test")