awswrangler.s3.read_parquet_table

awswrangler.s3.read_parquet_table(table: str, database: str, filename_suffix: str | list[str] | None = None, filename_ignore_suffix: str | list[str] | None = None, catalog_id: str | None = None, partition_filter: Callable[[dict[str, str]], bool] | None = None, columns: list[str] | None = None, validate_schema: bool = True, coerce_int96_timestamp_unit: str | None = None, dtype_backend: Literal['numpy_nullable', 'pyarrow'] = 'numpy_nullable', chunked: bool | int = False, use_threads: bool | int = True, ray_args: RayReadParquetSettings | None = None, boto3_session: Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, decryption_configuration: ArrowDecryptionConfiguration | None = None) DataFrame | Iterator[DataFrame]

Read Apache Parquet table registered in the AWS Glue Catalog.

Note

Batching (chunked argument) (Memory Friendly):

Used to return an Iterable of DataFrames instead of a regular DataFrame.

Two batching strategies are available:

  • If chunked=True, depending on the size of the data, one or more data frames are returned per file in the path/dataset. Unlike chunked=INTEGER, rows from different files will not be mixed in the resulting data frames.

  • If chunked=INTEGER, awswrangler will iterate on the data by number of rows equal the received INTEGER.

P.S. chunked=True is faster and uses less memory while chunked=INTEGER is more precise in the number of rows.

Note

If use_threads=True, the number of threads is obtained from os.cpu_count().

Note

This function has arguments which can be configured globally through wr.config or environment variables:

  • catalog_id

  • database

  • dtype_backend

Check out the Global Configurations Tutorial for details.

Note

Following arguments are not supported in distributed mode with engine EngineEnum.RAY:

  • boto3_session

  • s3_additional_kwargs

  • dtype_backend

Parameters:
  • table (str) – AWS Glue Catalog table name.

  • database (str) – AWS Glue Catalog database name.

  • filename_suffix (Union[str, List[str], None]) – Suffix or List of suffixes to be read (e.g. [“.gz.parquet”, “.snappy.parquet”]). If None, read all files. (default)

  • filename_ignore_suffix (Union[str, List[str], None]) – Suffix or List of suffixes for S3 keys to be ignored.(e.g. [“.csv”, “_SUCCESS”]). If None, read all files. (default)

  • catalog_id (str, optional) – The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default.

  • partition_filter (Optional[Callable[[Dict[str, str]], bool]]) – Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter). This function must receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values. Partitions values must be strings and the function must return a bool, True to read the partition or False to ignore it. Ignored if dataset=False. E.g lambda x: True if x["year"] == "2020" and x["month"] == "1" else False https://aws-sdk-pandas.readthedocs.io/en/3.7.2/tutorials/023%20-%20Flexible%20Partitions%20Filter.html

  • columns (List[str], optional) – List of columns to read from the file(s).

  • validate_schema (bool, default False) – Check that the schema is consistent across individual files.

  • coerce_int96_timestamp_unit (str, optional) – Cast timestamps that are stored in INT96 format to a particular resolution (e.g. “ms”). Setting to None is equivalent to “ns” and therefore INT96 timestamps are inferred as in nanoseconds.

  • dtype_backend (str, optional) –

    Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set.

    The dtype_backends are still experimential. The “pyarrow” backend is only supported with Pandas 2.0 or above.

  • chunked (Union[int, bool]) – If passed, the data is split into an iterable of DataFrames (Memory friendly). If True an iterable of DataFrames is returned without guarantee of chunksize. If an INTEGER is passed, an iterable of DataFrames is returned with maximum rows equal to the received INTEGER.

  • use_threads (Union[bool, int], default True) – True to enable concurrent requests, False to disable multiple threads. If enabled, os.cpu_count() is used as the max number of threads. If integer is provided, specified number is used.

  • ray_args (RayReadParquetSettings, optional) – Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed.

  • boto3_session (boto3.Session(), optional) – Boto3 Session. The default boto3 session is used if None is received.

  • s3_additional_kwargs (dict[str, Any], optional) – Forward to S3 botocore requests.

  • pyarrow_additional_kwargs (Dict[str, Any], optional) – Forwarded to to_pandas method converting from PyArrow tables to Pandas DataFrame. Valid values include “split_blocks”, “self_destruct”, “ignore_metadata”. e.g. pyarrow_additional_kwargs={‘split_blocks’: True}.

  • decryption_configuration (ArrowDecryptionConfiguration, optional) – pyarrow.parquet.encryption.CryptoFactory and pyarrow.parquet.encryption.KmsConnectionConfig objects dict used to create a PyArrow CryptoFactory.file_decryption_properties object to forward to PyArrow reader. Client Decryption is not supported in distributed mode.

Returns:

Pandas DataFrame or a Generator in case of chunked=True.

Return type:

Union[pandas.DataFrame, Generator[pandas.DataFrame, None, None]]

Examples

Reading Parquet Table

>>> import awswrangler as wr
>>> df = wr.s3.read_parquet_table(database='...', table='...')

Reading Parquet Table in chunks (Chunk by file)

>>> import awswrangler as wr
>>> dfs = wr.s3.read_parquet_table(database='...', table='...', chunked=True)
>>> for df in dfs:
>>>     print(df)  # Smaller Pandas DataFrame

Reading Parquet Dataset with PUSH-DOWN filter over partitions

>>> import awswrangler as wr
>>> my_filter = lambda x: True if x["city"].startswith("new") else False
>>> df = wr.s3.read_parquet_table(path, dataset=True, partition_filter=my_filter)