AWS SDK for pandas

34 - Distributing Calls Using Ray

AWS SDK for pandas supports distribution of specific calls using ray and modin.

When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale.

[1]:
!pip install "awswrangler[modin,ray,redshift]"

Importing awswrangler when ray and modin are installed will automatically initialize a local Ray instance.

[3]:
import awswrangler as wr

print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")
Execution Engine: EngineEnum.RAY
Memory Format: MemoryFormatEnum.MODIN

Read data at scale

Data is read using all cores on a single machine or multiple nodes on a cluster

[2]:
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2019/")
df.head(5)
2023-09-15 12:24:44,457 INFO worker.py:1621 -- Started a local Ray instance.
2023-09-15 12:25:10,728 INFO read_api.py:374 -- To satisfy the requested parallelism of 200, each read task output will be split into 34 smaller blocks.
[dataset]: Run `pip install tqdm` to enable progress reporting.
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[2]:
vendor_id pickup_at dropoff_at passenger_count trip_distance rate_code_id store_and_fwd_flag pickup_location_id dropoff_location_id payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge
0 1 2019-01-01 00:46:40 2019-01-01 00:53:20 1 1.5 1 N 151 239 1 7.0 0.5 0.5 1.65 0.0 0.3 9.950000 NaN
1 1 2019-01-01 00:59:47 2019-01-01 01:18:59 1 2.6 1 N 239 246 1 14.0 0.5 0.5 1.00 0.0 0.3 16.299999 NaN
2 2 2018-12-21 13:48:30 2018-12-21 13:52:40 3 0.0 1 N 236 236 1 4.5 0.5 0.5 0.00 0.0 0.3 5.800000 NaN
3 2 2018-11-28 15:52:25 2018-11-28 15:55:45 5 0.0 1 N 193 193 2 3.5 0.5 0.5 0.00 0.0 0.3 7.550000 NaN
4 2 2018-11-28 15:56:57 2018-11-28 15:58:33 5 0.0 2 N 193 193 2 52.0 0.0 0.5 0.00 0.0 0.3 55.549999 NaN

The data type is a modin DataFrame

[4]:
type(df)
[4]:
modin.pandas.dataframe.DataFrame

However, this type is interoperable with standard pandas calls:

[4]:
filtered_df = df[df.trip_distance > 30]
excluded_columns = ["vendor_id", "passenger_count", "store_and_fwd_flag"]
filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]

Enter your bucket name:

[7]:
bucket = "BUCKET"

Write data at scale

The write operation is parallelized, leading to significant speed-ups

[9]:
result = wr.s3.to_parquet(
    filtered_df,
    path=f"s3://{bucket}/taxi/",
    dataset=True,
)
print(f"Data has been written to {len(result['paths'])} files")
Data has been written to 408 files
2023-09-15 12:32:28,917 WARNING plan.py:567 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
2023-09-15 12:32:31,094 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2023-09-15 12:32:31,095 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-15 12:32:31,096 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Data has been written to 408 files

Copy to Redshift at scale…

Data is first staged in S3 then a COPY command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster

[12]:
# Connect to the Redshift instance
con = wr.redshift.connect("aws-sdk-pandas-redshift")

path = f"s3://{bucket}/stage/"
iam_role = "ROLE"
schema = "public"
table = "taxi"

wr.redshift.copy(
    df=filtered_df,
    path=path,
    con=con,
    schema=schema,
    table=table,
    mode="overwrite",
    iam_role=iam_role,
    max_rows_by_file=None,
)
2023-09-15 12:52:24,155 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2023-09-15 12:52:24,157 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-15 12:52:24,157 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`

… and UNLOAD it back

Parallel calls can also be leveraged when reading from the cluster. The UNLOAD command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe

[13]:
df = wr.redshift.unload(
    sql=f"SELECT * FROM {schema}.{table} where trip_distance > 30",
    con=con,
    iam_role=iam_role,
    path=path,
    keep_files=True,
)

df.head()
2023-09-15 12:56:53,838 INFO read_api.py:374 -- To satisfy the requested parallelism of 16, each read task output will be split into 8 smaller blocks.
[13]:
pickup_at dropoff_at trip_distance rate_code_id pickup_location_id dropoff_location_id payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge
0 2019-01-22 17:40:04 2019-01-22 18:33:48 30.469999 4 132 265 1 142.000000 1.0 0.5 28.760000 0.00 0.3 172.559998 0.0
1 2019-01-22 18:36:34 2019-01-22 19:52:50 33.330002 5 51 221 1 96.019997 0.0 0.5 0.000000 11.52 0.3 108.339996 0.0
2 2019-01-22 19:11:08 2019-01-22 20:16:10 32.599998 1 231 205 1 88.000000 1.0 0.5 0.000000 0.00 0.3 89.800003 0.0
3 2019-01-22 19:14:15 2019-01-22 20:09:57 36.220001 4 132 265 1 130.500000 1.0 0.5 27.610001 5.76 0.3 165.669998 0.0
4 2019-01-22 19:51:56 2019-01-22 20:48:39 33.040001 5 132 265 1 130.000000 0.0 0.5 29.410000 16.26 0.3 176.470001 0.0

Find a needle in a hay stack with S3 Select

[3]:
import awswrangler as wr

# Run S3 Select query against all objects for 2019 year to find trips starting from a particular location
wr.s3.select_query(
    sql='SELECT * FROM s3object s where s."pickup_location_id" = 138',
    path="s3://ursa-labs-taxi-data/2019/",
    input_serialization="Parquet",
    input_serialization_params={},
    scan_range_chunk_size=32 * 1024 * 1024,
)
[3]:
vendor_id pickup_at dropoff_at passenger_count trip_distance rate_code_id store_and_fwd_flag pickup_location_id dropoff_location_id payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge
0 1 2019-01-01T00:19:55.000Z 2019-01-01T00:57:56.000Z 1 12.30 1 N 138 50 1 38.0 0.5 0.5 4.00 5.76 0.3 49.060001 NaN
1 2 2019-01-01T00:48:10.000Z 2019-01-01T01:36:58.000Z 1 31.57 1 N 138 138 2 82.5 0.5 0.5 0.00 0.00 0.3 83.800003 NaN
2 1 2019-01-01T00:39:58.000Z 2019-01-01T00:58:58.000Z 2 8.90 1 N 138 224 1 26.0 0.5 0.5 8.25 5.76 0.3 41.310001 NaN
3 1 2019-01-01T00:07:45.000Z 2019-01-01T00:34:12.000Z 4 9.60 1 N 138 239 1 29.0 0.5 0.5 7.20 5.76 0.3 43.259998 NaN
4 2 2019-01-01T00:27:40.000Z 2019-01-01T00:52:15.000Z 1 12.89 1 N 138 87 2 36.0 0.5 0.5 0.00 0.00 0.3 37.299999 NaN
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
1167508 2 2019-06-30T23:42:24.000Z 2019-07-01T00:10:28.000Z 1 15.66 1 N 138 265 2 44.0 0.5 0.5 0.00 0.00 0.3 45.299999 0.0
1167509 2 2019-06-30T23:07:34.000Z 2019-06-30T23:25:09.000Z 1 7.38 1 N 138 262 1 22.0 0.5 0.5 7.98 6.12 0.3 39.900002 2.5
1167510 2 2019-06-30T23:00:36.000Z 2019-06-30T23:20:18.000Z 1 11.24 1 N 138 107 1 31.0 0.5 0.5 8.18 6.12 0.3 49.099998 2.5
1167511 1 2019-06-30T23:08:06.000Z 2019-06-30T23:30:20.000Z 1 7.50 1 N 138 229 1 24.0 3.0 0.5 4.00 0.00 0.3 31.799999 2.5
1167512 2 2019-06-30T23:15:13.000Z 2019-06-30T23:35:18.000Z 2 8.73 1 N 138 262 1 25.5 0.5 0.5 1.77 6.12 0.3 37.189999 2.5

1167513 rows × 18 columns

[ ]: