AWS SDK for pandas

39 - Athena Iceberg

Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue catalog for their metastore. More in User Guide.

Create Iceberg table

[50]:
import getpass

bucket_name = getpass.getpass()
[2]:
import awswrangler as wr

glue_database = "aws_sdk_pandas"
glue_table = "iceberg_test"
path = f"s3://{bucket_name}/iceberg_test/"
temp_path = f"s3://{bucket_name}/iceberg_test_temp/"

# Cleanup table before create
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
[2]:
True

Create table & insert data

It is possible to insert Pandas data frame into Iceberg table using wr.athena.to_iceberg. If the table does not exist, it will be created:

[ ]:
import pandas as pd

df = pd.DataFrame({"id": [1, 2, 3], "name": ["John", "Lily", "Richard"]})

wr.athena.to_iceberg(
    df=df,
    database=glue_database,
    table=glue_table,
    table_location=path,
    temp_path=temp_path,
)

Alternatively, it is also possible to insert by directly running INSERT INTO ... VALUES:

[53]:
wr.athena.start_query_execution(
    sql=f"INSERT INTO {glue_table} VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
    database=glue_database,
    wait=True,
)
[53]:
{'QueryExecutionId': 'e339fcd2-9db1-43ac-bb9e-9730e6395b51',
 'Query': "INSERT INTO iceberg_test VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
 'StatementType': 'DML',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 8, 612000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 11, 143000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2242,
  'DataScannedInBytes': 0,
  'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51-manifest.csv',
  'TotalExecutionTimeInMillis': 2531,
  'QueryQueueTimeInMillis': 241,
  'QueryPlanningTimeInMillis': 179,
  'ServiceProcessingTimeInMillis': 48,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}
[54]:
wr.athena.start_query_execution(
    sql=f"INSERT INTO {glue_table} VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
    database=glue_database,
    wait=True,
)
[54]:
{'QueryExecutionId': '922c8f02-4c00-4050-b4a7-7016809efa2b',
 'Query': "INSERT INTO iceberg_test VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
 'StatementType': 'DML',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 24, 582000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 27, 352000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2414,
  'DataScannedInBytes': 0,
  'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b-manifest.csv',
  'TotalExecutionTimeInMillis': 2770,
  'QueryQueueTimeInMillis': 329,
  'QueryPlanningTimeInMillis': 189,
  'ServiceProcessingTimeInMillis': 27,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}

Query

[65]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[65]:
id name
0 1 John
1 4 Anne
2 2 Lily
3 3 Richard
4 5 Jacob
5 6 Leon

Read query metadata

In a SELECT query, you can use the following properties after table_name to query Iceberg table metadata:

  • $files Shows a table’s current data files

  • $manifests Shows a table’s current file manifests

  • $history Shows a table’s history

  • $partitions Shows a table’s current partitions

[55]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$files"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[55]:
content file_path file_format record_count file_size_in_bytes column_sizes value_counts null_value_counts nan_value_counts lower_bounds upper_bounds key_metadata split_offsets equality_ids
0 0 s3://.../iceberg_test/data/089a... PARQUET 3 360 {1=48, 2=63} {1=3, 2=3} {1=0, 2=0} {} {1=1, 2=John} {1=3, 2=Richard} <NA> NaN NaN
1 0 s3://.../iceberg_test/data/5736... PARQUET 3 355 {1=48, 2=61} {1=3, 2=3} {1=0, 2=0} {} {1=4, 2=Anne} {1=6, 2=Leon} <NA> NaN NaN
[56]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$manifests"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[56]:
path length partition_spec_id added_snapshot_id added_data_files_count added_rows_count existing_data_files_count existing_rows_count deleted_data_files_count deleted_rows_count partitions
0 s3://.../iceberg_test/metadata/... 6538 0 4379263637983206651 1 3 0 0 0 0 []
1 s3://.../iceberg_test/metadata/... 6548 0 2934717851675145063 1 3 0 0 0 0 []
[58]:
df = wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$history"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)

# Save snapshot id
snapshot_id = df.snapshot_id[0]

df
[58]:
made_current_at snapshot_id parent_id is_current_ancestor
0 2023-03-16 09:40:10.438000+00:00 2934717851675145063 <NA> True
1 2023-03-16 09:40:26.754000+00:00 4379263637983206651 2934717851675144704 True
[59]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$partitions"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[59]:
record_count file_count total_size data
0 6 2 715 {id={min=1, max=6, null_count=0, nan_count=nul...

Time travel

[60]:
wr.athena.read_sql_query(
    sql=f"SELECT * FROM {glue_table} FOR TIMESTAMP AS OF (current_timestamp - interval '5' second)",
    database=glue_database,
)
[60]:
id name
0 1 John
1 4 Anne
2 2 Lily
3 3 Richard
4 5 Jacob
5 6 Leon

Version travel

[61]:
wr.athena.read_sql_query(
    sql=f"SELECT * FROM {glue_table} FOR VERSION AS OF {snapshot_id}",
    database=glue_database,
)
[61]:
id name
0 1 John
1 2 Lily
2 3 Richard

Optimize

The OPTIMIZE table REWRITE DATA compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files. For syntax and table property details, see OPTIMIZE.

[62]:
wr.athena.start_query_execution(
    sql=f"OPTIMIZE {glue_table} REWRITE DATA USING BIN_PACK",
    database=glue_database,
    wait=True,
)
[62]:
{'QueryExecutionId': '94666790-03ae-42d7-850a-fae99fa79a68',
 'Query': 'OPTIMIZE iceberg_test REWRITE DATA USING BIN_PACK',
 'StatementType': 'DDL',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 42, 857000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 45, 655000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2622,
  'DataScannedInBytes': 220,
  'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68-manifest.csv',
  'TotalExecutionTimeInMillis': 2798,
  'QueryQueueTimeInMillis': 124,
  'QueryPlanningTimeInMillis': 252,
  'ServiceProcessingTimeInMillis': 52,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}

Vacuum

VACUUM performs snapshot expiration and orphan file removal. These actions reduce metadata size and remove files not in the current table state that are also older than the retention period specified for the table. For syntax details, see VACUUM.

[64]:
wr.athena.start_query_execution(
    sql=f"VACUUM {glue_table}",
    database=glue_database,
    wait=True,
)
[64]:
{'QueryExecutionId': '717a7de6-b873-49c7-b744-1b0b402f24c9',
 'Query': 'VACUUM iceberg_test',
 'StatementType': 'DML',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/717a7de6-b873-49c7-b744-1b0b402f24c9.csv'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 41, 14000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 43, 441000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2229,
  'DataScannedInBytes': 0,
  'TotalExecutionTimeInMillis': 2427,
  'QueryQueueTimeInMillis': 153,
  'QueryPlanningTimeInMillis': 30,
  'ServiceProcessingTimeInMillis': 45,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}