At scale¶
AWS SDK for pandas supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes. Head to our tutorial to set up a self-managed Ray cluster on Amazon Elastic Compute Cloud (Amazon EC2).
Getting Started¶
Install the library with the these two optional dependencies to enable distributed mode:
>>> pip install "awswrangler[ray,modin]"
Once installed, you can use the library in your code as usual:
>>> import awswrangler as wr
At import, SDK for pandas checks if ray
and modin
are in the installation path and enables distributed mode.
To confirm that you are in distributed mode, run:
>>> print(f"Execution Engine: {wr.engine.get()}")
>>> print(f"Memory Format: {wr.memory_format.get()}")
which show that both Ray and Modin are enabled as an execution engine and memory format, respectively. You can switch back to non-distributed mode at any point (See Switching modes below).
Initialization of the Ray cluster is lazy and only triggered when the first distributed API is executed.
At that point, SDK for pandas looks for an environment variable called WR_ADDRESS
.
If found, it is used to send commands to a remote cluster.
If not found, a local Ray runtime is initialized on your machine instead.
Alternatively, you can trigger Ray initialization with:
>>> wr.engine.initialize()
In distributed mode, the same awswrangler
APIs can now handle much larger datasets:
# Read 1.6 Gb Parquet data
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2017/")
# Drop vendor_id column
df.drop("vendor_id", axis=1, inplace=True)
# Filter trips over 1 mile
df1 = df[df["trip_distance"] > 1]
In the example above, New York City Taxi data is read from Amazon S3 into a distributed Modin data frame. Modin is a drop-in replacement for Pandas. It exposes the same APIs but enables you to use all of the cores on your machine, or all of the workers in an entire cluster, leading to improved performance and scale. To use it, make sure to replace your pandas import statement with modin:
>>> import modin.pandas as pd # instead of import pandas as pd
Failing to do so means that all operations run on a single thread instead of leveraging the entire cluster resources.
Note that in distributed mode, all awswrangler
APIs return and accept Modin data frames, not pandas.
Supported APIs¶
This table lists the awswrangler
APIs available in distributed mode (i.e. that can run at scale):
Service |
API |
Implementation |
---|---|---|
|
|
✅ |
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
|
✅ |
|
|
✅ |
|
✅ |
|
|
✅ |
Switching modes¶
The following commands showcase how to switch between distributed and non-distributed modes:
# Switch to non-distributed
wr.engine.set("python")
wr.memory_format.set("pandas")
# Switch to distributed
wr.engine.set("ray")
wr.memory_format.set("modin")
Similarly, you can set the WR_ENGINE
and WR_MEMORY_FORMAT
environment variables
to the desired engine and memory format, respectively.
Caveats¶
S3FS Filesystem¶
When Ray is chosen as an engine, S3Fs is used instead of boto3 for certain API calls.
These include listing a large number of S3 objects for example.
This choice was made for performance reasons as a boto3 implementation can be much slower in some cases.
As a side effect,
users won’t be able to use the s3_additional_kwargs
input parameter as it’s currently not supported by S3Fs.
Unsupported kwargs¶
Most AWS SDK for pandas calls support passing the boto3_session
argument.
While this is acceptable for an application running in a single process,
distributed applications require the session to be serialized and passed to the worker nodes in the cluster.
This constitutes a security risk.
As a result, passing boto3_session
when using the Ray runtime is not supported.
To learn more¶
Head to our latest tutorials to discover even more features.
A runbook with common errors when running the library with Ray is available here.