AWS SDK for pandas

34 - Distributing Calls Using Ray

AWS SDK for pandas supports distribution of specific calls using ray and modin.

When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale.

[1]:
!pip install "awswrangler[modin,ray,redshift]"

Importing awswrangler when ray and modin are installed will automatically initialize a local Ray instance.

[2]:
import awswrangler as wr
import modin.pandas as pd

print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")
2022-10-24 14:59:36,287 INFO worker.py:1518 -- Started a local Ray instance.
Execution Engine: EngineEnum.RAY
Memory Format: MemoryFormatEnum.MODIN

Read data at scale

Data is read using all cores on a single machine or multiple nodes on a cluster

[3]:
df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Furniture/")
df.head(5)
Read progress: 100%|██████████| 10/10 [01:10<00:00,  7.03s/it]
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[3]:
marketplace customer_id review_id product_id product_parent product_title star_rating helpful_votes total_votes vine verified_purchase review_headline review_body review_date year
0 US 35680291 R34O1VWWYVAU9A B000MWFEV6 406798096 Baxton Studio Full Leather Storage Bench Ottom... 5 1 1 N Y High quality and roomy I bought this bench as a storage necessity as ... 2009-05-17 2009
1 US 21000590 RU1I9NHALXPW5 B004C1RULU 239421036 Alera Fraze Series Leather High-Back Swivel/Ti... 3 8 9 N Y Do not judge the chair on the first day alone. Received this chair really fast because I had ... 2012-06-29 2012
2 US 12140069 R2O8R9CLCUQTB8 B000GFWQDI 297104356 Matching Cherry Printer Stand with Casters and... 5 4 4 N Y Printer stand made into printer / PC stand I wanted to get my pc's off the floor and off ... 2009-05-17 2009
3 US 23755701 R12FOIKUUXPHBZ B0055DOI50 39731200 Marquette Bed 5 6 6 N Y Excellent Value!! Great quality for the price. This bed is easy ... 2012-06-29 2012
4 US 50735969 RK0XUO7P40TK9 B0026RH3X2 751769063 Cape Craftsman Shutter 2-Door Cabinet 3 12 12 N N Nice, but not best quality I love the design of this cabinet! It's a very... 2009-05-17 2009

The data type is a modin DataFrame

[4]:
type(df)
[4]:
modin.pandas.dataframe.DataFrame

However, this type is interoperable with standard pandas calls:

[5]:
filtered_df = df[df.helpful_votes > 10]
excluded_columns = ["product_title", "review_headline", "review_body"]
filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]

Enter your bucket name:

[6]:
bucket = "BUCKET_NAME"

Write data at scale

The write operation is parallelized, leading to significant speed-ups

[7]:
result = wr.s3.to_parquet(
    filtered_df,
    path=f"s3://{bucket}/amazon-reviews/",
    dataset=True,
    dtype={"review_date": "timestamp"},
)
print(f"Data has been written to {len(result['paths'])} files")
Write Progress: 100%|██████████| 10/10 [00:21<00:00,  2.14s/it]
Data has been written to 10 files

Copy to Redshift at scale…

Data is first staged in S3 then a COPY command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster

[8]:
# Connect to the Redshift instance
con = wr.redshift.connect("aws-sdk-pandas-redshift")

path = f"s3://{bucket}/stage/"
iam_role = "IAM_ROLE"
schema = "public"
table = "amazon_reviews"

wr.redshift.copy(
    df=filtered_df,
    path=path,
    con=con,
    schema=schema,
    table=table,
    mode="overwrite",
    iam_role=iam_role,
    max_rows_by_file=None,
)
Repartition: 100%|██████████| 1/1 [00:00<00:00,  1.42it/s]
Write Progress: 100%|██████████| 1/1 [00:06<00:00,  6.19s/it]

… and UNLOAD it back

Parallel calls can also be leveraged when reading from the cluster. The UNLOAD command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe

[9]:
wr.redshift.unload(
    sql=f"SELECT * FROM {schema}.{table} where star_rating = 5",
    con=con,
    iam_role=iam_role,
    path=path,
    keep_files=True,
)
2022-10-20 11:20:02,369 WARNING read_api.py:291 -- ⚠️  The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|██████████| 2/2 [00:01<00:00,  1.41it/s]
[9]:
marketplace customer_id review_id product_id product_parent star_rating helpful_votes total_votes vine verified_purchase review_date year
0 US 23875938 RC5BC3HYUV324 B000EPKLFA 878266274 5 15 17 N Y 2009-07-12 2009
1 US 22174246 R3MFRIKP6HMH0W B001NJ4J6I 394928248 5 20 23 N Y 2009-07-19 2009
2 US 52886745 R1T9C0QELFI939 B0012ZNNR4 364197484 5 32 33 N N 2009-07-24 2009
3 US 14527742 R2CIP31EO2GXDK B000M5Z98G 199037166 5 12 12 N Y 2009-08-23 2009
4 US 41393002 R29IOXB832QR6L B0071HBVYE 956030824 5 16 16 N Y 2012-07-12 2012
... ... ... ... ... ... ... ... ... ... ... ... ...
16022 US 20481704 R2KV325KBKDKL8 B00G701H5E 703622282 5 16 16 N N 2014-11-06 2014
16023 US 37023256 R1FJT6UF7KM8GV B005VY8U8Y 220718418 5 23 25 N Y 2014-11-08 2014
16024 US 24286944 R1RSIZBY4Z3PF2 B00LNCDGKU 934098561 5 47 49 N Y 2014-11-14 2014
16025 US 15276457 R31YFDIUQ2HI2X B005KFHWPG 310427061 5 19 20 N Y 2014-11-15 2014
16026 US 52215985 R11U6K1OIDEUKH B00NEJ4Y4M 22567782 5 62 67 Y N 2014-11-16 2014

16027 rows x 12 columns

Find a needle in a hay stack with S3 Select

[10]:
# Run S3 Select query against all objects in the category for a given customer ID
wr.s3.select_query(
    sql="SELECT * FROM s3object s where s.\"customer_id\" = '51624146'",
    path="s3://amazon-reviews-pds/parquet/product_category=Office_Products/*.parquet",
    input_serialization="Parquet",
    input_serialization_params={},
    scan_range_chunk_size=32*1024*1024,
)
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[10]:
marketplace customer_id review_id product_id product_parent product_title star_rating helpful_votes total_votes vine verified_purchase review_headline review_body review_date year
0 US 51624146 RU9SWH8SHOBBS B001ERDENS 658861629 LINKYO Compatible Toner Cartridge Replacement ... 5 0 0 N Y Perfect fit for my HP LaserJet M1522 nf I will never buy &#34;official&#34; toner cart... 2013-07-12 2013
1 US 51624146 RAO9QADXC9TUH B00GJQA4TG 184072656 SuperChalks White Liquid Chalk Marker Pens 4-P... 4 0 0 N Y Smooth flowing "ink, " but these markers left ... Smooth flowing &#34;ink,&#34; but these marker... 2014-10-06 2014
2 US 51624146 R1D94CA7TKY9DU B000MK647G 396184528 Fax Toner Cartridge for Brother IntelliFax 575... 5 0 0 N Y Came quickly, works great I bought four of these for my office. Just kno... 2014-03-26 2014