34 - Distributing Calls Using Ray¶
AWS SDK for pandas supports distribution of specific calls using ray and modin.
When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale.
[1]:
!pip install "awswrangler[modin,ray,redshift]"
Importing awswrangler
when ray
and modin
are installed will automatically initialize a local Ray instance.
[3]:
import awswrangler as wr
print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")
Execution Engine: EngineEnum.RAY
Memory Format: MemoryFormatEnum.MODIN
Read data at scale¶
Data is read using all cores on a single machine or multiple nodes on a cluster
[2]:
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2019/")
df.head(5)
2023-09-15 12:24:44,457 INFO worker.py:1621 -- Started a local Ray instance.
2023-09-15 12:25:10,728 INFO read_api.py:374 -- To satisfy the requested parallelism of 200, each read task output will be split into 34 smaller blocks.
[dataset]: Run `pip install tqdm` to enable progress reporting.
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[2]:
vendor_id | pickup_at | dropoff_at | passenger_count | trip_distance | rate_code_id | store_and_fwd_flag | pickup_location_id | dropoff_location_id | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 2019-01-01 00:46:40 | 2019-01-01 00:53:20 | 1 | 1.5 | 1 | N | 151 | 239 | 1 | 7.0 | 0.5 | 0.5 | 1.65 | 0.0 | 0.3 | 9.950000 | NaN |
1 | 1 | 2019-01-01 00:59:47 | 2019-01-01 01:18:59 | 1 | 2.6 | 1 | N | 239 | 246 | 1 | 14.0 | 0.5 | 0.5 | 1.00 | 0.0 | 0.3 | 16.299999 | NaN |
2 | 2 | 2018-12-21 13:48:30 | 2018-12-21 13:52:40 | 3 | 0.0 | 1 | N | 236 | 236 | 1 | 4.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 5.800000 | NaN |
3 | 2 | 2018-11-28 15:52:25 | 2018-11-28 15:55:45 | 5 | 0.0 | 1 | N | 193 | 193 | 2 | 3.5 | 0.5 | 0.5 | 0.00 | 0.0 | 0.3 | 7.550000 | NaN |
4 | 2 | 2018-11-28 15:56:57 | 2018-11-28 15:58:33 | 5 | 0.0 | 2 | N | 193 | 193 | 2 | 52.0 | 0.0 | 0.5 | 0.00 | 0.0 | 0.3 | 55.549999 | NaN |
The data type is a modin DataFrame
[4]:
type(df)
[4]:
modin.pandas.dataframe.DataFrame
However, this type is interoperable with standard pandas calls:
[4]:
filtered_df = df[df.trip_distance > 30]
excluded_columns = ["vendor_id", "passenger_count", "store_and_fwd_flag"]
filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]
Enter your bucket name:
[7]:
bucket = "BUCKET"
Write data at scale¶
The write operation is parallelized, leading to significant speed-ups
[9]:
result = wr.s3.to_parquet(
filtered_df,
path=f"s3://{bucket}/taxi/",
dataset=True,
)
print(f"Data has been written to {len(result['paths'])} files")
Data has been written to 408 files
2023-09-15 12:32:28,917 WARNING plan.py:567 -- Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune
2023-09-15 12:32:31,094 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2023-09-15 12:32:31,095 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-15 12:32:31,096 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
Data has been written to 408 files
Copy to Redshift at scale…¶
Data is first staged in S3 then a COPY command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster
[12]:
# Connect to the Redshift instance
con = wr.redshift.connect("aws-sdk-pandas-redshift")
path = f"s3://{bucket}/stage/"
iam_role = "ROLE"
schema = "public"
table = "taxi"
wr.redshift.copy(
df=filtered_df,
path=path,
con=con,
schema=schema,
table=table,
mode="overwrite",
iam_role=iam_role,
max_rows_by_file=None,
)
2023-09-15 12:52:24,155 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Write]
2023-09-15 12:52:24,157 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-15 12:52:24,157 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
… and UNLOAD it back¶
Parallel calls can also be leveraged when reading from the cluster. The UNLOAD command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe
[13]:
df = wr.redshift.unload(
sql=f"SELECT * FROM {schema}.{table} where trip_distance > 30",
con=con,
iam_role=iam_role,
path=path,
keep_files=True,
)
df.head()
2023-09-15 12:56:53,838 INFO read_api.py:374 -- To satisfy the requested parallelism of 16, each read task output will be split into 8 smaller blocks.
[13]:
pickup_at | dropoff_at | trip_distance | rate_code_id | pickup_location_id | dropoff_location_id | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2019-01-22 17:40:04 | 2019-01-22 18:33:48 | 30.469999 | 4 | 132 | 265 | 1 | 142.000000 | 1.0 | 0.5 | 28.760000 | 0.00 | 0.3 | 172.559998 | 0.0 |
1 | 2019-01-22 18:36:34 | 2019-01-22 19:52:50 | 33.330002 | 5 | 51 | 221 | 1 | 96.019997 | 0.0 | 0.5 | 0.000000 | 11.52 | 0.3 | 108.339996 | 0.0 |
2 | 2019-01-22 19:11:08 | 2019-01-22 20:16:10 | 32.599998 | 1 | 231 | 205 | 1 | 88.000000 | 1.0 | 0.5 | 0.000000 | 0.00 | 0.3 | 89.800003 | 0.0 |
3 | 2019-01-22 19:14:15 | 2019-01-22 20:09:57 | 36.220001 | 4 | 132 | 265 | 1 | 130.500000 | 1.0 | 0.5 | 27.610001 | 5.76 | 0.3 | 165.669998 | 0.0 |
4 | 2019-01-22 19:51:56 | 2019-01-22 20:48:39 | 33.040001 | 5 | 132 | 265 | 1 | 130.000000 | 0.0 | 0.5 | 29.410000 | 16.26 | 0.3 | 176.470001 | 0.0 |
Find a needle in a hay stack with S3 Select¶
[3]:
import awswrangler as wr
# Run S3 Select query against all objects for 2019 year to find trips starting from a particular location
wr.s3.select_query(
sql='SELECT * FROM s3object s where s."pickup_location_id" = 138',
path="s3://ursa-labs-taxi-data/2019/",
input_serialization="Parquet",
input_serialization_params={},
scan_range_chunk_size=32 * 1024 * 1024,
)
[3]:
vendor_id | pickup_at | dropoff_at | passenger_count | trip_distance | rate_code_id | store_and_fwd_flag | pickup_location_id | dropoff_location_id | payment_type | fare_amount | extra | mta_tax | tip_amount | tolls_amount | improvement_surcharge | total_amount | congestion_surcharge | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 1 | 2019-01-01T00:19:55.000Z | 2019-01-01T00:57:56.000Z | 1 | 12.30 | 1 | N | 138 | 50 | 1 | 38.0 | 0.5 | 0.5 | 4.00 | 5.76 | 0.3 | 49.060001 | NaN |
1 | 2 | 2019-01-01T00:48:10.000Z | 2019-01-01T01:36:58.000Z | 1 | 31.57 | 1 | N | 138 | 138 | 2 | 82.5 | 0.5 | 0.5 | 0.00 | 0.00 | 0.3 | 83.800003 | NaN |
2 | 1 | 2019-01-01T00:39:58.000Z | 2019-01-01T00:58:58.000Z | 2 | 8.90 | 1 | N | 138 | 224 | 1 | 26.0 | 0.5 | 0.5 | 8.25 | 5.76 | 0.3 | 41.310001 | NaN |
3 | 1 | 2019-01-01T00:07:45.000Z | 2019-01-01T00:34:12.000Z | 4 | 9.60 | 1 | N | 138 | 239 | 1 | 29.0 | 0.5 | 0.5 | 7.20 | 5.76 | 0.3 | 43.259998 | NaN |
4 | 2 | 2019-01-01T00:27:40.000Z | 2019-01-01T00:52:15.000Z | 1 | 12.89 | 1 | N | 138 | 87 | 2 | 36.0 | 0.5 | 0.5 | 0.00 | 0.00 | 0.3 | 37.299999 | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
1167508 | 2 | 2019-06-30T23:42:24.000Z | 2019-07-01T00:10:28.000Z | 1 | 15.66 | 1 | N | 138 | 265 | 2 | 44.0 | 0.5 | 0.5 | 0.00 | 0.00 | 0.3 | 45.299999 | 0.0 |
1167509 | 2 | 2019-06-30T23:07:34.000Z | 2019-06-30T23:25:09.000Z | 1 | 7.38 | 1 | N | 138 | 262 | 1 | 22.0 | 0.5 | 0.5 | 7.98 | 6.12 | 0.3 | 39.900002 | 2.5 |
1167510 | 2 | 2019-06-30T23:00:36.000Z | 2019-06-30T23:20:18.000Z | 1 | 11.24 | 1 | N | 138 | 107 | 1 | 31.0 | 0.5 | 0.5 | 8.18 | 6.12 | 0.3 | 49.099998 | 2.5 |
1167511 | 1 | 2019-06-30T23:08:06.000Z | 2019-06-30T23:30:20.000Z | 1 | 7.50 | 1 | N | 138 | 229 | 1 | 24.0 | 3.0 | 0.5 | 4.00 | 0.00 | 0.3 | 31.799999 | 2.5 |
1167512 | 2 | 2019-06-30T23:15:13.000Z | 2019-06-30T23:35:18.000Z | 2 | 8.73 | 1 | N | 138 | 262 | 1 | 25.5 | 0.5 | 0.5 | 1.77 | 6.12 | 0.3 | 37.189999 | 2.5 |
1167513 rows × 18 columns
[ ]: