39 - Athena Iceberg¶
Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue catalog for their metastore. More in User Guide.
Create Iceberg table¶
[50]:
import getpass
bucket_name = getpass.getpass()
[2]:
import awswrangler as wr
glue_database = "aws_sdk_pandas"
glue_table = "iceberg_test"
path = f"s3://{bucket_name}/iceberg_test/"
temp_path = f"s3://{bucket_name}/iceberg_test_temp/"
# Cleanup table before create
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
[2]:
True
Create table & insert data¶
It is possible to insert Pandas data frame into Iceberg table using wr.athena.to_iceberg
. If the table does not exist, it will be created:
[ ]:
import pandas as pd
df = pd.DataFrame({"id": [1, 2, 3], "name": ["John", "Lily", "Richard"]})
wr.athena.to_iceberg(
df=df,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=temp_path,
)
Alternatively, it is also possible to insert by directly running INSERT INTO ... VALUES
:
[53]:
wr.athena.start_query_execution(
sql=f"INSERT INTO {glue_table} VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
database=glue_database,
wait=True,
)
[53]:
{'QueryExecutionId': 'e339fcd2-9db1-43ac-bb9e-9730e6395b51',
'Query': "INSERT INTO iceberg_test VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 8, 612000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 11, 143000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2242,
'DataScannedInBytes': 0,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51-manifest.csv',
'TotalExecutionTimeInMillis': 2531,
'QueryQueueTimeInMillis': 241,
'QueryPlanningTimeInMillis': 179,
'ServiceProcessingTimeInMillis': 48,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
[54]:
wr.athena.start_query_execution(
sql=f"INSERT INTO {glue_table} VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
database=glue_database,
wait=True,
)
[54]:
{'QueryExecutionId': '922c8f02-4c00-4050-b4a7-7016809efa2b',
'Query': "INSERT INTO iceberg_test VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 24, 582000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 27, 352000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2414,
'DataScannedInBytes': 0,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b-manifest.csv',
'TotalExecutionTimeInMillis': 2770,
'QueryQueueTimeInMillis': 329,
'QueryPlanningTimeInMillis': 189,
'ServiceProcessingTimeInMillis': 27,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Query¶
[65]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[65]:
id | name | |
---|---|---|
0 | 1 | John |
1 | 4 | Anne |
2 | 2 | Lily |
3 | 3 | Richard |
4 | 5 | Jacob |
5 | 6 | Leon |
Read query metadata¶
In a SELECT query, you can use the following properties after table_name
to query Iceberg table metadata:
$files
Shows a table’s current data files
$manifests
Shows a table’s current file manifests
$history
Shows a table’s history
$partitions
Shows a table’s current partitions
[55]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$files"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[55]:
content | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | s3://.../iceberg_test/data/089a... | PARQUET | 3 | 360 | {1=48, 2=63} | {1=3, 2=3} | {1=0, 2=0} | {} | {1=1, 2=John} | {1=3, 2=Richard} | <NA> | NaN | NaN |
1 | 0 | s3://.../iceberg_test/data/5736... | PARQUET | 3 | 355 | {1=48, 2=61} | {1=3, 2=3} | {1=0, 2=0} | {} | {1=4, 2=Anne} | {1=6, 2=Leon} | <NA> | NaN | NaN |
[56]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$manifests"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[56]:
path | length | partition_spec_id | added_snapshot_id | added_data_files_count | added_rows_count | existing_data_files_count | existing_rows_count | deleted_data_files_count | deleted_rows_count | partitions | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | s3://.../iceberg_test/metadata/... | 6538 | 0 | 4379263637983206651 | 1 | 3 | 0 | 0 | 0 | 0 | [] |
1 | s3://.../iceberg_test/metadata/... | 6548 | 0 | 2934717851675145063 | 1 | 3 | 0 | 0 | 0 | 0 | [] |
[58]:
df = wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$history"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
# Save snapshot id
snapshot_id = df.snapshot_id[0]
df
[58]:
made_current_at | snapshot_id | parent_id | is_current_ancestor | |
---|---|---|---|---|
0 | 2023-03-16 09:40:10.438000+00:00 | 2934717851675145063 | <NA> | True |
1 | 2023-03-16 09:40:26.754000+00:00 | 4379263637983206651 | 2934717851675144704 | True |
[59]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$partitions"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[59]:
record_count | file_count | total_size | data | |
---|---|---|---|---|
0 | 6 | 2 | 715 | {id={min=1, max=6, null_count=0, nan_count=nul... |
Time travel¶
[60]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table} FOR TIMESTAMP AS OF (current_timestamp - interval '5' second)",
database=glue_database,
)
[60]:
id | name | |
---|---|---|
0 | 1 | John |
1 | 4 | Anne |
2 | 2 | Lily |
3 | 3 | Richard |
4 | 5 | Jacob |
5 | 6 | Leon |
Version travel¶
[61]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table} FOR VERSION AS OF {snapshot_id}",
database=glue_database,
)
[61]:
id | name | |
---|---|---|
0 | 1 | John |
1 | 2 | Lily |
2 | 3 | Richard |
Optimize¶
The OPTIMIZE table REWRITE DATA
compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files. For syntax and table property details, see OPTIMIZE.
[62]:
wr.athena.start_query_execution(
sql=f"OPTIMIZE {glue_table} REWRITE DATA USING BIN_PACK",
database=glue_database,
wait=True,
)
[62]:
{'QueryExecutionId': '94666790-03ae-42d7-850a-fae99fa79a68',
'Query': 'OPTIMIZE iceberg_test REWRITE DATA USING BIN_PACK',
'StatementType': 'DDL',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 42, 857000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 45, 655000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2622,
'DataScannedInBytes': 220,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68-manifest.csv',
'TotalExecutionTimeInMillis': 2798,
'QueryQueueTimeInMillis': 124,
'QueryPlanningTimeInMillis': 252,
'ServiceProcessingTimeInMillis': 52,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Vacuum¶
VACUUM
performs snapshot expiration and orphan file removal. These actions reduce metadata size and remove files not in the current table state that are also older than the retention period specified for the table. For syntax details, see VACUUM.
[64]:
wr.athena.start_query_execution(
sql=f"VACUUM {glue_table}",
database=glue_database,
wait=True,
)
[64]:
{'QueryExecutionId': '717a7de6-b873-49c7-b744-1b0b402f24c9',
'Query': 'VACUUM iceberg_test',
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/717a7de6-b873-49c7-b744-1b0b402f24c9.csv'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 41, 14000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 43, 441000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2229,
'DataScannedInBytes': 0,
'TotalExecutionTimeInMillis': 2427,
'QueryQueueTimeInMillis': 153,
'QueryPlanningTimeInMillis': 30,
'ServiceProcessingTimeInMillis': 45,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}