AWS SDK for pandas

35 - Distributing Calls on Ray Remote Cluster

AWS SDK for pandas supports distribution of specific calls on a cluster of EC2s using ray.

Note that this tutorial creates a cluster of EC2 nodes which will incur a charge in your account. Please make sure to delete the cluster at the end.

Install the library

[ ]:
!pip install "awswrangler[modin,ray]"

Configure and Build Ray Cluster on AWS

Build Prerequisite Infrastructure

Click on the link below to provision an AWS CloudFormation stack. It builds a security group and IAM instance profile for the Ray Cluster to use. A valid CIDR range (encompassing your local machine IP) and a VPC ID are required.

|1fa5709f1b7f4b078c7411777130a25f|

Configure Ray Cluster Configuration

Start with a cluster configuration file (YAML).

[ ]:
!touch config.yml

Replace all values to match your desired region, account number and name of resources deployed by the above CloudFormation Stack.

A limited set of AWS regions is currently supported (Python 3.8 and above). The example configuration below uses the AMI for us-east-1.

Then edit config.yml file with your custom configuration.

cluster_name: pandas-sdk-cluster min_workers: 2 max_workers: 2 provider: type: aws region: us-east-1 # Change AWS region as necessary availability_zone: us-east-1a,us-east-1b,us-east-1c # Change as necessary security_group: GroupName: ray-cluster cache_stopped_nodes: False available_node_types: ray.head.default: node_config: InstanceType: m4.xlarge IamInstanceProfile: # Replace with your account id and profile name if you did not use the default value Arn: arn:aws:iam::{ACCOUNT ID}:instance-profile/ray-cluster # Replace ImageId if using a different region / python version ImageId: ami-02ea7c238b7ba36af TagSpecifications: # Optional tags - ResourceType: "instance" Tags: - Key: Platform Value: "ray" ray.worker.default: min_workers: 2 max_workers: 2 node_config: InstanceType: m4.xlarge IamInstanceProfile: # Replace with your account id and profile name if you did not use the default value Arn: arn:aws:iam::{ACCOUNT ID}:instance-profile/ray-cluster # Replace ImageId if using a different region / python version ImageId: ami-02ea7c238b7ba36af TagSpecifications: # Optional tags - ResourceType: "instance" Tags: - Key: Platform Value: "ray" setup_commands: - pip install "awswrangler[modin,ray]"

Provision Ray Cluster

The command below creates a Ray cluster in your account based on the aforementioned config file. It consists of one head node and 2 workers (m4xlarge EC2s). The command takes a few minutes to complete.

[ ]:
!ray up -y config.yml

Once the cluster is up and running, we set the RAY_ADDRESS environment variable to the head node Ray Cluster Address

[ ]:
import os
import subprocess

head_node_ip = subprocess.check_output(["ray", "get-head-ip", "config.yml"]).decode("utf-8").split("\n")[-2]
os.environ["RAY_ADDRESS"] = f"ray://{head_node_ip}:10001"

As a result, awswrangler API calls now run on the cluster, not on your local machine. The SDK detects the required dependencies for its distributed mode and parallelizes supported methods on the cluster.

[ ]:
import modin.pandas as pd

import awswrangler as wr

print(f"Execution engine: {wr.engine.get()}")
print(f"Memory format: {wr.memory_format.get()}")

Enter bucket Name

[ ]:
bucket = "BUCKET_NAME"

Read & write some data at scale on the cluster

[ ]:
# Read last 3 months of Taxi parquet compressed data (400 Mb)
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2018/1*.parquet")
df["month"] = df["pickup_at"].dt.month

# Write it back to S3 partitioned by month
path = f"s3://{bucket}/taxi-data/"
database = "ray_test"
wr.catalog.create_database(name=database, exist_ok=True)
table = "nyc_taxi"

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    database=database,
    table=table,
    partition_cols=["month"],
)

Read it back via Athena UNLOAD

The UNLOAD command distributes query processing in Athena to dump results in S3 which are then read in parallel into a dataframe

[ ]:
unload_path = f"s3://{bucket}/unload/nyc_taxi/"

# Athena UNLOAD requires that the S3 path is empty
# Note that s3.delete_objects is also a distributed call
wr.s3.delete_objects(unload_path)

wr.athena.read_sql_query(
    f"SELECT * FROM {table}",
    database=database,
    ctas_approach=False,
    unload_approach=True,
    s3_output=unload_path,
)

The EC2 cluster must be terminated or it will incur a charge.

[ ]:
!ray down -y ./config.yml

More Info on Ray Clusters on AWS