At scale

AWS SDK for pandas supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes. Head to our tutorial to set up a self-managed Ray cluster on Amazon Elastic Compute Cloud (Amazon EC2).

Getting Started

Install the library with the these two optional dependencies to enable distributed mode:

>>> pip install "awswrangler[ray,modin]"

Once installed, you can use the library in your code as usual:

>>> import awswrangler as wr

At import, SDK for pandas checks if ray and modin are in the installation path and enables distributed mode. To confirm that you are in distributed mode, run:

>>> print(f"Execution Engine: {wr.engine.get()}")
>>> print(f"Memory Format: {wr.memory_format.get()}")

which show that both Ray and Modin are enabled as an execution engine and memory format, respectively. You can switch back to non-distributed mode at any point (See Switching modes below).

Initialization of the Ray cluster is lazy and only triggered when the first distributed API is executed. At that point, SDK for pandas looks for an environment variable called WR_ADDRESS. If found, it is used to send commands to a remote cluster. If not found, a local Ray runtime is initialized on your machine instead. Alternatively, you can trigger Ray initialization with:

>>> wr.engine.initialize()

In distributed mode, the same awswrangler APIs can now handle much larger datasets:

# Read 1.6 Gb Parquet data
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/")

# Drop vendor_id column
df.drop("vendor_id", axis=1, inplace=True)

# Filter trips over 1 mile
df1 = df[df["trip_distance"] > 1]

In the example above, New York City Taxi data is read from Amazon S3 into a distributed Modin data frame. Modin is a drop-in replacement for Pandas. It exposes the same APIs but enables you to use all of the cores on your machine, or all of the workers in an entire cluster, leading to improved performance and scale. To use it, make sure to replace your pandas import statement with modin:

>>> import modin.pandas as pd  # instead of import pandas as pd

Failing to do so means that all operations run on a single thread instead of leveraging the entire cluster resources.

Note that in distributed mode, all awswrangler APIs return and accept Modin data frames, not pandas.

Supported APIs

This table lists the awswrangler APIs available in distributed mode (i.e. that can run at scale):

Service

API

Implementation

S3

read_parquet

read_parquet_metadata

read_parquet_table

read_csv

read_json

read_fwf

to_parquet

to_csv

to_json

select_query

store_parquet_metadata

delete_objects

describe_objects

size_objects

wait_objects_exist

wait_objects_not_exist

merge_datasets

copy_objects

Redshift

copy

unload

Athena

describe_table

get_query_results

read_sql_query

read_sql_table

show_create_table

to_iceberg

delete_from_iceberg_table

DynamoDB

read_items

put_df

put_csv

put_json

put_items

Neptune

bulk_load

Timestream

batch_load

write

unload

Switching modes

The following commands showcase how to switch between distributed and non-distributed modes:

# Switch to non-distributed
wr.engine.set("python")
wr.memory_format.set("pandas")

# Switch to distributed
wr.engine.set("ray")
wr.memory_format.set("modin")

Similarly, you can set the WR_ENGINE and WR_MEMORY_FORMAT environment variables to the desired engine and memory format, respectively.

Caveats

S3FS Filesystem

When Ray is chosen as an engine, S3Fs is used instead of boto3 for certain API calls. These include listing a large number of S3 objects for example. This choice was made for performance reasons as a boto3 implementation can be much slower in some cases. As a side effect, users won’t be able to use the s3_additional_kwargs input parameter as it’s currently not supported by S3Fs.

Unsupported kwargs

Most AWS SDK for pandas calls support passing the boto3_session argument. While this is acceptable for an application running in a single process, distributed applications require the session to be serialized and passed to the worker nodes in the cluster. This constitutes a security risk. As a result, passing boto3_session when using the Ray runtime is not supported.

To learn more

Head to our latest tutorials to discover even more features.

A runbook with common errors when running the library with Ray is available here.