6 - Amazon Athena¶
awswrangler has three ways to run queries on Athena and fetch the result as a DataFrame:
ctas_approach=True (Default)
Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
CONS
:Requires create/delete table permissions on Glue.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
A temporary table will be created and then deleted immediately.
Does not support custom data_source/catalog_id.
unload_approach=True and ctas_approach=False
Does an UNLOAD query on Athena and parse the Parquet result on s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
Does not modify Glue Data Catalog.
CONS
:Output S3 path must be empty.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
ctas_approach=False
Does a regular query on Athena and parse the regular CSV result on s3.
PROS
:Faster for small result sizes (less latency).
Does not require create/delete table permissions on Glue
Supports timestamp with time zone.
Support custom data_source/catalog_id.
CONS
:Slower (But stills faster than other libraries that uses the regular Athena API)
Does not handle nested types at all.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[ ]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/by_year/189", names=cols, parse_dates=["dt", "obs_time"]
) # Read 10 files from the 1890 decade (~1GB)
df
[ ]:
wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", database="awswrangler_test", table="noaa")
[ ]:
wr.catalog.table(database="awswrangler_test", table="noaa")
Reading with ctas_approach=False¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
Default with ctas_approach=True - 13x faster (default)¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
Using categories to speed up and save memory - 24x faster¶
[ ]:
%%time
wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"],
)
Reading with unload_approach=True¶
[ ]:
%%time
wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
ctas_approach=False,
unload_approach=True,
s3_output=f"s3://{bucket}/unload/",
)
Batching (Good for restricted memory environments)¶
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=True, # Chunksize calculated automatically for ctas_approach.
)
for df in dfs: # Batching
print(len(df.index))
[ ]:
%%time
dfs = wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", chunksize=100_000_000)
for df in dfs: # Batching
print(len(df.index))
Parameterized queries¶
Client-side parameter resolution¶
The params
parameter allows client-side resolution of parameters, which are specified with :col_name
, when paramstyle
is set to named
. Additionally, Python types will map to the appropriate Athena definitions. For example, the value dt.date(2023, 1, 1)
will resolve to DATE '2023-01-01
.
For the example below, the following query will be sent to Athena:
SELECT * FROM noaa WHERE S_FLAG = 'E'
[ ]:
%%time
wr.athena.read_sql_query(
"SELECT * FROM noaa WHERE S_FLAG = :flag_value",
database="awswrangler_test",
params={
"flag_value": "E",
},
)
Server-side parameter resolution¶
Alternatively, Athena supports server-side parameter resolution when paramstyle
is defined as qmark
. The SQL statement sent to Athena will not contain the values passed in params
. Instead, they will be passed as part of a separate params
parameter in boto3
.
The downside of using this approach is that types aren’t automatically resolved. The values sent to params
must be strings. Therefore, if one of the values is a date, the value passed in params
has to be DATE 'XXXX-XX-XX'
.
The upside, however, is that these parameters can be used with prepared statements.
For more information, see “Using parameterized queries”.
[ ]:
%%time
wr.athena.read_sql_query(
"SELECT * FROM noaa WHERE S_FLAG = ?",
database="awswrangler_test",
params=["E"],
paramstyle="qmark",
)
Prepared statements¶
[ ]:
wr.athena.create_prepared_statement(
sql="SELECT * FROM noaa WHERE S_FLAG = ?",
statement_name="statement",
)
# Resolve parameter using Athena execution parameters
wr.athena.read_sql_query(
sql="EXECUTE statement",
database="awswrangler_test",
params=["E"],
paramstyle="qmark",
)
# Resolve parameter using Athena execution parameters (same effect as above)
wr.athena.read_sql_query(
sql="EXECUTE statement USING ?",
database="awswrangler_test",
params=["E"],
paramstyle="qmark",
)
# Resolve parameter using client-side formatter
wr.athena.read_sql_query(
sql="EXECUTE statement USING :flag_value",
database="awswrangler_test",
params={
"flag_value": "E",
},
paramstyle="named",
)
[ ]:
# Clean up prepared statement
wr.athena.delete_prepared_statement(statement_name="statement")
Cleaning Up S3¶
[ ]:
wr.s3.delete_objects(path)
Delete table¶
[ ]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
Delete Database¶
[ ]:
wr.catalog.delete_database("awswrangler_test")