39 - Athena Iceberg¶
Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue catalog for their metastore. More in User Guide.
Create Iceberg table¶
[50]:
import getpass
bucket_name = getpass.getpass()
[2]:
import awswrangler as wr
glue_database = "aws_sdk_pandas"
glue_table = "iceberg_test"
path = f"s3://{bucket_name}/iceberg_test/"
temp_path = f"s3://{bucket_name}/iceberg_test_temp/"
# Cleanup table before create
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
[2]:
True
Create table & insert data¶
It is possible to insert Pandas data frame into Iceberg table using wr.athena.to_iceberg. If the table does not exist, it will be created:
[ ]:
import pandas as pd
df = pd.DataFrame({"id": [1, 2, 3], "name": ["John", "Lily", "Richard"]})
wr.athena.to_iceberg(
df=df,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=temp_path,
)
Alternatively, it is also possible to insert by directly running INSERT INTO ... VALUES:
[53]:
wr.athena.start_query_execution(
sql=f"INSERT INTO {glue_table} VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
database=glue_database,
wait=True,
)
[53]:
{'QueryExecutionId': 'e339fcd2-9db1-43ac-bb9e-9730e6395b51',
'Query': "INSERT INTO iceberg_test VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 8, 612000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 11, 143000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2242,
'DataScannedInBytes': 0,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51-manifest.csv',
'TotalExecutionTimeInMillis': 2531,
'QueryQueueTimeInMillis': 241,
'QueryPlanningTimeInMillis': 179,
'ServiceProcessingTimeInMillis': 48,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
[54]:
wr.athena.start_query_execution(
sql=f"INSERT INTO {glue_table} VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
database=glue_database,
wait=True,
)
[54]:
{'QueryExecutionId': '922c8f02-4c00-4050-b4a7-7016809efa2b',
'Query': "INSERT INTO iceberg_test VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 24, 582000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 27, 352000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2414,
'DataScannedInBytes': 0,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b-manifest.csv',
'TotalExecutionTimeInMillis': 2770,
'QueryQueueTimeInMillis': 329,
'QueryPlanningTimeInMillis': 189,
'ServiceProcessingTimeInMillis': 27,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Query¶
[65]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[65]:
| id | name | |
|---|---|---|
| 0 | 1 | John |
| 1 | 4 | Anne |
| 2 | 2 | Lily |
| 3 | 3 | Richard |
| 4 | 5 | Jacob |
| 5 | 6 | Leon |
Read query metadata¶
In a SELECT query, you can use the following properties after table_name to query Iceberg table metadata:
$filesShows a table’s current data files
$manifestsShows a table’s current file manifests
$historyShows a table’s history
$partitionsShows a table’s current partitions
[55]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$files"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[55]:
| content | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 0 | s3://.../iceberg_test/data/089a... | PARQUET | 3 | 360 | {1=48, 2=63} | {1=3, 2=3} | {1=0, 2=0} | {} | {1=1, 2=John} | {1=3, 2=Richard} | <NA> | NaN | NaN |
| 1 | 0 | s3://.../iceberg_test/data/5736... | PARQUET | 3 | 355 | {1=48, 2=61} | {1=3, 2=3} | {1=0, 2=0} | {} | {1=4, 2=Anne} | {1=6, 2=Leon} | <NA> | NaN | NaN |
[56]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$manifests"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[56]:
| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | added_rows_count | existing_data_files_count | existing_rows_count | deleted_data_files_count | deleted_rows_count | partitions | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | s3://.../iceberg_test/metadata/... | 6538 | 0 | 4379263637983206651 | 1 | 3 | 0 | 0 | 0 | 0 | [] |
| 1 | s3://.../iceberg_test/metadata/... | 6548 | 0 | 2934717851675145063 | 1 | 3 | 0 | 0 | 0 | 0 | [] |
[58]:
df = wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$history"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
# Save snapshot id
snapshot_id = df.snapshot_id[0]
df
[58]:
| made_current_at | snapshot_id | parent_id | is_current_ancestor | |
|---|---|---|---|---|
| 0 | 2023-03-16 09:40:10.438000+00:00 | 2934717851675145063 | <NA> | True |
| 1 | 2023-03-16 09:40:26.754000+00:00 | 4379263637983206651 | 2934717851675144704 | True |
[59]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$partitions"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[59]:
| record_count | file_count | total_size | data | |
|---|---|---|---|---|
| 0 | 6 | 2 | 715 | {id={min=1, max=6, null_count=0, nan_count=nul... |
Time travel¶
[60]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table} FOR TIMESTAMP AS OF (current_timestamp - interval '5' second)",
database=glue_database,
)
[60]:
| id | name | |
|---|---|---|
| 0 | 1 | John |
| 1 | 4 | Anne |
| 2 | 2 | Lily |
| 3 | 3 | Richard |
| 4 | 5 | Jacob |
| 5 | 6 | Leon |
Version travel¶
[61]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table} FOR VERSION AS OF {snapshot_id}",
database=glue_database,
)
[61]:
| id | name | |
|---|---|---|
| 0 | 1 | John |
| 1 | 2 | Lily |
| 2 | 3 | Richard |
Optimize¶
The OPTIMIZE table REWRITE DATA compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files. For syntax and table property details, see OPTIMIZE.
[62]:
wr.athena.start_query_execution(
sql=f"OPTIMIZE {glue_table} REWRITE DATA USING BIN_PACK",
database=glue_database,
wait=True,
)
[62]:
{'QueryExecutionId': '94666790-03ae-42d7-850a-fae99fa79a68',
'Query': 'OPTIMIZE iceberg_test REWRITE DATA USING BIN_PACK',
'StatementType': 'DDL',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 42, 857000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 45, 655000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2622,
'DataScannedInBytes': 220,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68-manifest.csv',
'TotalExecutionTimeInMillis': 2798,
'QueryQueueTimeInMillis': 124,
'QueryPlanningTimeInMillis': 252,
'ServiceProcessingTimeInMillis': 52,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Vacuum¶
VACUUM performs snapshot expiration and orphan file removal. These actions reduce metadata size and remove files not in the current table state that are also older than the retention period specified for the table. For syntax details, see VACUUM.
[64]:
wr.athena.start_query_execution(
sql=f"VACUUM {glue_table}",
database=glue_database,
wait=True,
)
[64]:
{'QueryExecutionId': '717a7de6-b873-49c7-b744-1b0b402f24c9',
'Query': 'VACUUM iceberg_test',
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/717a7de6-b873-49c7-b744-1b0b402f24c9.csv'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 41, 14000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 43, 441000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2229,
'DataScannedInBytes': 0,
'TotalExecutionTimeInMillis': 2427,
'QueryQueueTimeInMillis': 153,
'QueryPlanningTimeInMillis': 30,
'ServiceProcessingTimeInMillis': 45,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
