awswrangler.athena.to_iceberg

awswrangler.athena.to_iceberg(df: DataFrame, database: str, table: str, temp_path: str | None = None, index: bool = False, table_location: str | None = None, partition_cols: list[str] | None = None, merge_cols: list[str] | None = None, keep_files: bool = True, data_source: str | None = None, s3_output: str | None = None, workgroup: str = 'primary', mode: Literal['append', 'overwrite', 'overwrite_partitions'] = 'append', encryption: str | None = None, kms_key: str | None = None, boto3_session: Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, additional_table_properties: dict[str, Any] | None = None, dtype: dict[str, str] | None = None, catalog_id: str | None = None, schema_evolution: bool = False, fill_missing_columns_in_df: bool = True, glue_table_settings: GlueTableSettings | None = None) None

Insert into Athena Iceberg table using INSERT INTO … SELECT. Will create Iceberg table if it does not exist.

Creates temporary external table, writes staged files and inserts via INSERT INTO … SELECT.

Note

Following arguments are not supported in distributed mode with engine EngineEnum.RAY:

  • boto3_session

  • s3_additional_kwargs

Note

This function has arguments which can be configured globally through wr.config or environment variables:

  • catalog_id

  • database

  • workgroup

Check out the Global Configurations Tutorial for details.

Parameters:
  • df (pd.DataFrame) – Pandas DataFrame.

  • database (str) – AWS Glue/Athena database name - It is only the origin database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).

  • table (str) – AWS Glue/Athena table name.

  • temp_path (str) – Amazon S3 location to store temporary results. Workgroup config will be used if not provided.

  • index (bool) – Should consider the DataFrame index as a column?.

  • table_location (str, optional) – Amazon S3 location for the table. Will only be used to create a new table if it does not exist.

  • partition_cols (List[str], optional) –

    List of column names that will be used to create partitions, including support for transform functions (e.g. “day(ts)”).

    https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html#querying-iceberg-partitioning

  • merge_cols (List[str], optional) –

    List of column names that will be used for conditional inserts and updates.

    https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html

  • keep_files (bool) – Whether staging files produced by Athena are retained. ‘True’ by default.

  • data_source (str, optional) – Data Source / Catalog name. If None, ‘AwsDataCatalog’ will be used by default.

  • s3_output (str, optional) – Amazon S3 path used for query execution.

  • workgroup (str) – Athena workgroup. Primary by default.

  • mode (str) – append (default), overwrite, overwrite_partitions.

  • encryption (str, optional) – Valid values: [None, ‘SSE_S3’, ‘SSE_KMS’]. Notice: ‘CSE_KMS’ is not supported.

  • kms_key (str, optional) – For SSE-KMS, this is the KMS key ARN or ID.

  • boto3_session (boto3.Session(), optional) – Boto3 Session. The default boto3 session will be used if boto3_session receive None.

  • s3_additional_kwargs (dict[str, Any], optional) – Forwarded to botocore requests. e.g. s3_additional_kwargs={‘RequestPayer’: ‘requester’}

  • additional_table_properties (dict[str, Any], optional) –

    Additional table properties. e.g. additional_table_properties={‘write_target_data_file_size_bytes’: ‘536870912’}

    https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html#querying-iceberg-table-properties

  • dtype (dict[str, str], optional) – Dictionary of columns names and Athena/Glue types to be casted. Useful when you have columns with undetermined or mixed data types. e.g. {‘col name’: ‘bigint’, ‘col2 name’: ‘int’}

  • catalog_id (str, optional) – The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default

  • schema_evolution (bool, optional) – If True allows schema evolution for new columns or changes in column types. Columns missing from the DataFrame that are present in the Iceberg schema will throw an error unless fill_missing_columns_in_df is set to True. Default is False.

  • fill_missing_columns_in_df (bool, optional) – If True, fill columns that was missing in the DataFrame with NULL values. Default is True.

  • columns_comments (GlueTableSettings, optional) – Glue/Athena catalog: Settings for writing to the Glue table. Currently only the ‘columns_comments’ attribute is supported for this function. Columns comments can only be added with this function when creating a new table.

Return type:

None

Examples

Insert into an existing Iceberg table

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.athena.to_iceberg(
...     df=pd.DataFrame({'col': [1, 2, 3]}),
...     database='my_database',
...     table='my_table',
...     temp_path='s3://bucket/temp/',
... )

Create Iceberg table and insert data (table doesn’t exist, requires table_location)

>>> import awswrangler as wr
>>> import pandas as pd
>>> wr.athena.to_iceberg(
...     df=pd.DataFrame({'col': [1, 2, 3]}),
...     database='my_database',
...     table='my_table2',
...     table_location='s3://bucket/my_table2/',
...     temp_path='s3://bucket/temp/',
... )