awswrangler.s3.read_parquet

awswrangler.s3.read_parquet(path: str | list[str], path_root: str | None = None, dataset: bool = False, path_suffix: str | list[str] | None = None, path_ignore_suffix: str | list[str] | None = None, ignore_empty: bool = True, partition_filter: Callable[[dict[str, str]], bool] | None = None, columns: list[str] | None = None, validate_schema: bool = False, coerce_int96_timestamp_unit: str | None = None, schema: Schema | None = None, last_modified_begin: datetime | None = None, last_modified_end: datetime | None = None, version_id: str | dict[str, str] | None = None, dtype_backend: Literal['numpy_nullable', 'pyarrow'] = 'numpy_nullable', chunked: bool | int = False, use_threads: bool | int = True, ray_args: RayReadParquetSettings | None = None, boto3_session: Session | None = None, s3_additional_kwargs: dict[str, Any] | None = None, pyarrow_additional_kwargs: dict[str, Any] | None = None, decryption_configuration: ArrowDecryptionConfiguration | None = None) DataFrame | Iterator[DataFrame]

Read Parquet file(s) from an S3 prefix or list of S3 objects paths.

The concept of dataset enables more complex features like partitioning and catalog integration (AWS Glue Catalog).

This function accepts Unix shell-style wildcards in the path argument. * (matches everything), ? (matches any single character), [seq] (matches any character in seq), [!seq] (matches any character not in seq). If you want to use a path which includes Unix shell-style wildcard characters (*, ?, []), you can use glob.escape(path) before passing the argument to this function.

Note

Batching (chunked argument) (Memory Friendly):

Used to return an Iterable of DataFrames instead of a regular DataFrame.

Two batching strategies are available:

  • If chunked=True, depending on the size of the data, one or more data frames are returned per file in the path/dataset. Unlike chunked=INTEGER, rows from different files are not mixed in the resulting data frames.

  • If chunked=INTEGER, awswrangler iterates on the data by number of rows equal to the received INTEGER.

P.S. chunked=True is faster and uses less memory while chunked=INTEGER is more precise in the number of rows.

Note

If use_threads=True, the number of threads is obtained from os.cpu_count().

Note

Filtering by last_modified begin and last_modified end is applied after listing all S3 files

Note

This function has arguments which can be configured globally through wr.config or environment variables:

  • dtype_backend

Check out the Global Configurations Tutorial for details.

Note

Following arguments are not supported in distributed mode with engine EngineEnum.RAY:

  • boto3_session

  • version_id

  • s3_additional_kwargs

  • dtype_backend

Parameters:
  • path (Union[str, List[str]]) – S3 prefix (accepts Unix shell-style wildcards) (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]).

  • path_root (str, optional) – Root path of the dataset. If dataset=`True`, it is used as a starting point to load partition columns.

  • dataset (bool, default False) – If True, read a parquet dataset instead of individual file(s), loading all related partitions as columns.

  • path_suffix (Union[str, List[str], None]) – Suffix or List of suffixes to be read (e.g. [“.gz.parquet”, “.snappy.parquet”]). If None, reads all files. (default)

  • path_ignore_suffix (Union[str, List[str], None]) – Suffix or List of suffixes to be ignored.(e.g. [“.csv”, “_SUCCESS”]). If None, reads all files. (default)

  • ignore_empty (bool, default True) – Ignore files with 0 bytes.

  • partition_filter (Callable[[Dict[str, str]], bool], optional) – Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter). This function must receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values. Partitions values must be strings and the function must return a bool, True to read the partition or False to ignore it. Ignored if dataset=False. E.g lambda x: True if x["year"] == "2020" and x["month"] == "1" else False https://aws-sdk-pandas.readthedocs.io/en/3.7.3/tutorials/023%20-%20Flexible%20Partitions%20Filter.html

  • columns (List[str], optional) – List of columns to read from the file(s).

  • validate_schema (bool, default False) – Check that the schema is consistent across individual files.

  • coerce_int96_timestamp_unit (str, optional) – Cast timestamps that are stored in INT96 format to a particular resolution (e.g. “ms”). Setting to None is equivalent to “ns” and therefore INT96 timestamps are inferred as in nanoseconds.

  • schema (pyarrow.Schema, optional) – Schema to use whem reading the file.

  • last_modified_begin (datetime, optional) – Filter S3 objects by Last modified date. Filter is only applied after listing all objects.

  • last_modified_end (datetime, optional) – Filter S3 objects by Last modified date. Filter is only applied after listing all objects.

  • version_id (Optional[Union[str, Dict[str, str]]]) – Version id of the object or mapping of object path to version id. (e.g. {‘s3://bucket/key0’: ‘121212’, ‘s3://bucket/key1’: ‘343434’})

  • dtype_backend (str, optional) –

    Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when “numpy_nullable” is set, pyarrow is used for all dtypes if “pyarrow” is set.

    The dtype_backends are still experimential. The “pyarrow” backend is only supported with Pandas 2.0 or above.

  • chunked (Union[int, bool]) – If passed, the data is split into an iterable of DataFrames (Memory friendly). If True an iterable of DataFrames is returned without guarantee of chunksize. If an INTEGER is passed, an iterable of DataFrames is returned with maximum rows equal to the received INTEGER.

  • use_threads (Union[bool, int], default True) – True to enable concurrent requests, False to disable multiple threads. If enabled, os.cpu_count() is used as the max number of threads. If integer is provided, specified number is used.

  • ray_args (RayReadParquetSettings, optional) – Parameters of the Ray Modin settings. Only used when distributed computing is used with Ray and Modin installed.

  • boto3_session (boto3.Session(), optional) – Boto3 Session. The default boto3 session is used if None is received.

  • s3_additional_kwargs (dict[str, Any], optional) – Forward to S3 botocore requests.

  • pyarrow_additional_kwargs (Dict[str, Any], optional) – Forwarded to to_pandas method converting from PyArrow tables to Pandas DataFrame. Valid values include “split_blocks”, “self_destruct”, “ignore_metadata”. e.g. pyarrow_additional_kwargs={‘split_blocks’: True}.

  • decryption_configuration (ArrowDecryptionConfiguration, optional) – pyarrow.parquet.encryption.CryptoFactory and pyarrow.parquet.encryption.KmsConnectionConfig objects dict used to create a PyArrow CryptoFactory.file_decryption_properties object to forward to PyArrow reader. see: https://arrow.apache.org/docs/python/parquet.html#decryption-configuration Client Decryption is not supported in distributed mode.

Returns:

Pandas DataFrame or a Generator in case of chunked=True.

Return type:

Union[pandas.DataFrame, Generator[pandas.DataFrame, None, None]]

Examples

Reading all Parquet files under a prefix

>>> import awswrangler as wr
>>> df = wr.s3.read_parquet(path='s3://bucket/prefix/')

Reading all Parquet files from a list

>>> import awswrangler as wr
>>> df = wr.s3.read_parquet(path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet'])

Reading in chunks (Chunk by file)

>>> import awswrangler as wr
>>> dfs = wr.s3.read_parquet(path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet'], chunked=True)
>>> for df in dfs:
>>>     print(df)  # Smaller Pandas DataFrame

Reading in chunks (Chunk by 1MM rows)

>>> import awswrangler as wr
>>> dfs = wr.s3.read_parquet(
...     path=['s3://bucket/filename0.parquet', 's3://bucket/filename1.parquet'],
...     chunked=1_000_000
... )
>>> for df in dfs:
>>>     print(df)  # 1MM Pandas DataFrame

Reading Parquet Dataset with PUSH-DOWN filter over partitions

>>> import awswrangler as wr
>>> my_filter = lambda x: True if x["city"].startswith("new") else False
>>> df = wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)