awswrangler.athena.to_iceberg¶
- awswrangler.athena.to_iceberg(df: DataFrame, database: str, table: str, temp_path: str | None = None, index: bool = False, table_location: str | None = None, partition_cols: list[str] | None = None, merge_cols: list[str] | None = None, merge_condition: Literal['update', 'ignore'] = 'update', merge_match_nulls: bool = False, keep_files: bool = True, data_source: str | None = None, s3_output: str | None = None, workgroup: str = 'primary', mode: Literal['append', 'overwrite', 'overwrite_partitions'] = 'append', encryption: str | None = None, kms_key: str | None = None, boto3_session: Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, additional_table_properties: dict[str, Any] | None = None, dtype: dict[str, str] | None = None, catalog_id: str | None = None, schema_evolution: bool = False, fill_missing_columns_in_df: bool = True, glue_table_settings: GlueTableSettings | None = None) None ¶
Insert into Athena Iceberg table using INSERT INTO … SELECT. Will create Iceberg table if it does not exist.
Creates temporary external table, writes staged files and inserts via INSERT INTO … SELECT.
Note
Following arguments are not supported in distributed mode with engine EngineEnum.RAY:
boto3_session
s3_additional_kwargs
Note
This function has arguments which can be configured globally through wr.config or environment variables:
catalog_id
database
workgroup
Check out the Global Configurations Tutorial for details.
- Parameters:
df (
DataFrame
) – Pandas DataFrame.database (
str
) – AWS Glue/Athena database name - It is only the origin database from where the query will be launched. You can still using and mixing several databases writing the full table name within the sql (e.g. database.table).table (
str
) – AWS Glue/Athena table name.temp_path (
str
|None
) – Amazon S3 location to store temporary results. Workgroup config will be used if not provided.index (
bool
) – Should consider the DataFrame index as a column?.table_location (
str
|None
) – Amazon S3 location for the table. Will only be used to create a new table if it does not exist.partition_cols (
list
[str
] |None
) –List of column names that will be used to create partitions, including support for transform functions (e.g. “day(ts)”).
merge_cols (
list
[str
] |None
) –List of column names that will be used for conditional inserts and updates.
https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
merge_condition (
Literal
['update'
,'ignore'
]) – The condition to be used in the MERGE INTO statement. Valid values: [‘update’, ‘ignore’].merge_match_nulls (
bool
) – Instruct whether to have nulls in the merge condition match other nullskeep_files (
bool
) – Whether staging files produced by Athena are retained. ‘True’ by default.data_source (
str
|None
) – Data Source / Catalog name. If None, ‘AwsDataCatalog’ will be used by default.s3_output (
str
|None
) – Amazon S3 path used for query execution.workgroup (
str
) – Athena workgroup. Primary by default.mode (
Literal
['append'
,'overwrite'
,'overwrite_partitions'
]) –append
(default),overwrite
,overwrite_partitions
.encryption (
str
|None
) – Valid values: [None, ‘SSE_S3’, ‘SSE_KMS’]. Notice: ‘CSE_KMS’ is not supported.kms_key (
str
|None
) – For SSE-KMS, this is the KMS key ARN or ID.boto3_session (
Session
|None
) – The default boto3 session will be used if boto3_session receiveNone
.s3_additional_kwargs (
dict
[str
,Any
] |None
) – Forwarded to botocore requests. e.g. s3_additional_kwargs={‘RequestPayer’: ‘requester’}additional_table_properties (
dict
[str
,Any
] |None
) –Additional table properties. e.g. additional_table_properties={‘write_target_data_file_size_bytes’: ‘536870912’}
dtype (
dict
[str
,str
] |None
) – Dictionary of columns names and Athena/Glue types to be casted. Useful when you have columns with undetermined or mixed data types. e.g. {‘col name’: ‘bigint’, ‘col2 name’: ‘int’}catalog_id (
str
|None
) – The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by defaultschema_evolution (
bool
) – IfTrue
allows schema evolution for new columns or changes in column types. Columns missing from the DataFrame that are present in the Iceberg schema will throw an error unlessfill_missing_columns_in_df
is set toTrue
. Default isFalse
.fill_missing_columns_in_df (
bool
) – IfTrue
, fill columns that was missing in the DataFrame withNULL
values. Default isTrue
.columns_comments – Glue/Athena catalog: Settings for writing to the Glue table. Currently only the ‘columns_comments’ attribute is supported for this function. Columns comments can only be added with this function when creating a new table.
- Return type:
None
Examples
Insert into an existing Iceberg table
>>> import awswrangler as wr >>> import pandas as pd >>> wr.athena.to_iceberg( ... df=pd.DataFrame({'col': [1, 2, 3]}), ... database='my_database', ... table='my_table', ... temp_path='s3://bucket/temp/', ... )
Create Iceberg table and insert data (table doesn’t exist, requires table_location)
>>> import awswrangler as wr >>> import pandas as pd >>> wr.athena.to_iceberg( ... df=pd.DataFrame({'col': [1, 2, 3]}), ... database='my_database', ... table='my_table2', ... table_location='s3://bucket/my_table2/', ... temp_path='s3://bucket/temp/', ... )