An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com
AWS Data Wrangler is now AWS SDK for pandas (awswrangler). We’re changing the name we use when we talk about the library, but everything else will stay the same. You’ll still be able to install using pip install awswrangler
and you won’t need to change any of your code. As part of this change, we’ve moved the library from AWS Labs to the main AWS GitHub organisation but, thanks to the GitHub’s redirect feature, you’ll still be able to access the project by its old URLs until you update your bookmarks. Our documentation has also moved to aws-sdk-pandas.readthedocs.io, but old bookmarks will redirect to the new site.
Quick Start¶
>>> pip install awswrangler
>>> # Optional modules are installed with:
>>> pip install 'awswrangler[redshift]'
import awswrangler as wr
import pandas as pd
from datetime import datetime
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
# Storing data on Data Lake
wr.s3.to_parquet(
df=df,
path="s3://bucket/dataset/",
dataset=True,
database="my_db",
table="my_table"
)
# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)
# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")
# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()
# Amazon Timestream Write
df = pd.DataFrame({
"time": [datetime.now(), datetime.now()],
"my_dimension": ["foo", "boo"],
"measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="measure",
dimensions_cols=["my_dimension"],
)
# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")
Read The Docs¶
What is AWS SDK for pandas?¶
An AWS Professional Service open source python initiative that extends the power of the pandas library to AWS, connecting DataFrames and AWS data & analytics services.
Easy integration with Athena, Glue, Redshift, Timestream, OpenSearch, Neptune, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).
Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute your usual ETL tasks like load/unloading data from Data Lakes, Data Warehouses and Databases, even at scale.
Check our tutorials or the list of functionalities.
Install¶
AWS SDK for pandas runs on Python 3.8
, 3.9
and 3.10
,
and on several platforms (AWS Lambda, AWS Glue Python Shell, EMR, EC2,
on-premises, Amazon SageMaker, local, etc).
Some good practices to follow for options below are:
Use new and isolated Virtual Environments for each project (venv).
On Notebooks, always restart your kernel after installations.
PyPI (pip)¶
>>> pip install awswrangler
>>> # Optional modules are installed with:
>>> pip install 'awswrangler[redshift]'
Conda¶
>>> conda install -c conda-forge awswrangler
At scale¶
AWS SDK for pandas can also run your workflows at scale by leveraging modin and ray.
>>> pip install "awswrangler[modin,ray]==3.1.1"
As a result existing scripts can run on significantly larger datasets with no code rewrite.
Optional dependencies¶
Starting version 3.0, some awswrangler
modules are optional and must be installed explicitly using:
>>> pip install 'awswrangler[optional-module1, optional-module2]'
The optional modules are:
redshift
mysql
postgres
sqlserver
oracle
gremlin
sparql
opencypher
openpyxl
opensearch
deltalake
Calling these modules without the required dependencies raises an error prompting you to install the missing package.
AWS Lambda Layer¶
Managed Layer¶
Note
There is a one week minimum delay between version release and layers being available in the AWS Lambda console.
Warning
Lambda Functions using the layer with a memory size of less than 512MB may be insufficient for some workloads.
AWS SDK for pandas is available as an AWS Lambda Managed layer in all AWS commercial regions.
It can be accessed in the AWS Lambda console directly:

Or via its ARN: arn:aws:lambda:<region>:336392948345:layer:AWSSDKPandas-Python<python-version>:<layer-version>
.
For example: arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python38:1
.
The full list of ARNs is available here.
Custom Layer¶
You can also create your own Lambda layer with these instructions:
1 - Go to GitHub’s release section and download the zipped layer for to the desired version. Alternatively, you can download the zip from the public artifacts bucket.
2 - Go to the AWS Lambda console, open the layer section (left side) and click create layer.
3 - Set name and python version, upload your downloaded zip file and press create.
4 - Go to your Lambda function and select your new layer!
Serverless Application Repository (SAR)¶
AWS SDK for pandas layers are also available in the AWS Serverless Application Repository (SAR).
The app deploys the Lambda layer version in your own AWS account and region via a CloudFormation stack. This option provides the ability to use semantic versions (i.e. library version) instead of Lambda layer versions.
App |
ARN |
Description |
---|---|---|
aws-sdk-pandas-layer-py3-8 |
arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-8 |
Layer for |
aws-sdk-pandas-layer-py3-9 |
arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-9 |
Layer for |
aws-sdk-pandas-layer-py3-10 |
arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-10 |
Layer for |
Here is an example of how to create and use the AWS SDK for pandas Lambda layer in your CDK app:
from aws_cdk import core, aws_sam as sam, aws_lambda
class AWSSDKPandasApp(core.Construct):
def __init__(self, scope: core.Construct, id_: str):
super.__init__(scope,id)
aws_sdk_pandas_layer = sam.CfnApplication(
self,
"awssdkpandas-layer",
location=sam.CfnApplication.ApplicationLocationProperty(
application_id="arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-8",
semantic_version="3.0.0", # Get the latest version from https://serverlessrepo.aws.amazon.com/applications
),
)
aws_sdk_pandas_layer_arn = aws_sdk_pandas_layer.get_att("Outputs.WranglerLayer38Arn").to_string()
aws_sdk_pandas_layer_version = aws_lambda.LayerVersion.from_layer_version_arn(self, "awssdkpandas-layer-version", aws_sdk_pandas_layer_arn)
aws_lambda.Function(
self,
"awssdkpandas-function",
runtime=aws_lambda.Runtime.PYTHON_3_8,
function_name="sample-awssdk-pandas-lambda-function",
code=aws_lambda.Code.from_asset("./src/awssdk-pandas-lambda"),
handler='lambda_function.lambda_handler',
layers=[aws_sdk_pandas_layer_version]
)
AWS Glue Python Shell Jobs¶
Note
Glue Python Shell Python3.9 has version 2.15.1 of awswrangler baked in. If you need a different version, follow instructions below:
1 - Go to GitHub’s release page and download the wheel file (.whl) related to the desired version. Alternatively, you can download the wheel from the public artifacts bucket.
2 - Upload the wheel file to the Amazon S3 location of your choice.
3 - Go to your Glue Python Shell job and point to the S3 wheel file in the Python library path field.
AWS Glue for Ray Jobs¶
Go to your Glue for Ray job and create a new Job parameters key/value:
Key:
--pip-install
Value:
awswrangler[modin]
AWS Glue PySpark Jobs¶
Note
AWS SDK for pandas has compiled dependencies (C/C++) so support is only available for Glue PySpark Jobs >= 2.0
.
Go to your Glue PySpark job and create a new Job parameters key/value:
Key:
--additional-python-modules
Value:
pyarrow==7,awswrangler
To install a specific version, set the value for the above Job parameter as follows:
Value:
pyarrow==7,pandas==1.5.3,awswrangler==3.1.1
Public Artifacts¶
Lambda zipped layers and Python wheels are stored in a publicly accessible S3 bucket for all versions.
Bucket:
aws-data-wrangler-public-artifacts
Prefix:
releases/<version>/
Lambda layer:
awswrangler-layer-<version>-py<py-version>.zip
Python wheel:
awswrangler-<version>-py3-none-any.whl
For example: s3://aws-data-wrangler-public-artifacts/releases/3.0.0/awswrangler-layer-3.0.0-py3.8.zip
You can check the bucket to find the latest version.
Amazon SageMaker Notebook¶
Run this command in any Python 3 notebook cell and then make sure to restart the kernel before importing the awswrangler package.
>>> !pip install awswrangler
Amazon SageMaker Notebook Lifecycle¶
Open the AWS SageMaker console, go to the lifecycle section and use the below snippet to configure AWS SDK for pandas for all compatible SageMaker kernels (Reference).
#!/bin/bash
set -e
# OVERVIEW
# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which
# is a system environment reserved for Jupyter.
# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using
# "nohup" to run this as a background process in that case.
sudo -u ec2-user -i <<'EOF'
# PARAMETERS
PACKAGE=awswrangler
# Note that "base" is special environment name, include it there as well.
for env in base /home/ec2-user/anaconda3/envs/*; do
source /home/ec2-user/anaconda3/bin/activate $(basename "$env")
if [ $env = 'JupyterSystemEnv' ]; then
continue
fi
nohup pip install --upgrade "$PACKAGE" &
source /home/ec2-user/anaconda3/bin/deactivate
done
EOF
EMR Cluster¶
Despite not being a distributed library, AWS SDK for pandas could be used to complement Big Data pipelines.
Configure Python 3 as the default interpreter for PySpark on your cluster configuration [ONLY REQUIRED FOR EMR < 6]
[ { "Classification": "spark-env", "Configurations": [ { "Classification": "export", "Properties": { "PYSPARK_PYTHON": "/usr/bin/python3" } } ] } ]
Keep the bootstrap script above on S3 and reference it on your cluster.
For EMR Release < 6
#!/usr/bin/env bash set -ex sudo pip-3.6 install pyarrow==2 awswrangler
For EMR Release >= 6
#!/usr/bin/env bash set -ex sudo pip install awswrangler
From Source¶
>>> git clone https://github.com/aws/aws-sdk-pandas.git
>>> cd aws-sdk-pandas
>>> pip install .
Notes for Microsoft SQL Server¶
awswrangler
uses pyodbc
for interacting with Microsoft SQL Server. To install this package you need the ODBC header files,
which can be installed, with the following commands:
>>> sudo apt install unixodbc-dev
>>> yum install unixODBC-devel
After installing these header files you can either just install pyodbc
or
awswrangler
with the sqlserver
extra, which will also install pyodbc
:
>>> pip install pyodbc
>>> pip install 'awswrangler[sqlserver]'
Finally you also need the correct ODBC Driver for SQL Server. You can have a look at the documentation from Microsoft to see how they can be installed in your environment.
If you want to connect to Microsoft SQL Server from AWS Lambda, you can build a separate Layer including the needed OBDC drivers and pyobdc.
If you maintain your own environment, you need to take care of the above steps. Because of this limitation usage in combination with Glue jobs is limited and you need to rely on the provided functionality inside Glue itself.
Notes for Oracle Database¶
awswrangler
is using the oracledb
for interacting with Oracle Database. For installing this package you do not need the Oracle Client libraries
unless you want to use the Thick mode.
You can have a look at the documentation from Oracle
to see how they can be installed in your environment.
After installing these client libraries you can either just install oracledb
or
awswrangler
with the oracle
extra, which will also install oracledb
:
>>> pip install oracledb
>>> pip install 'awswrangler[oracle]'
If you maintain your own environment, you need to take care of the above steps. Because of this limitation usage in combination with Glue jobs is limited and you need to rely on the provided functionality inside Glue itself.
At scale¶
AWS SDK for pandas supports Ray and Modin, enabling you to scale your pandas workflows from a single machine to a multi-node environment, with no code changes.
The simplest way to try this is with AWS Glue for Ray, the new serverless option to run distributed Python code announced at AWS re:Invent 2022. AWS SDK for pandas also supports self-managed Ray on Amazon Elastic Compute Cloud (Amazon EC2).
Getting Started¶
Install the library with the these two optional dependencies to enable distributed mode:
>>> pip install "awswrangler[ray,modin]"
Once installed, you can use the library in your code as usual:
>>> import awswrangler as wr
At import, SDK for pandas looks for an environmental variable called WR_ADDRESS
.
If found, it is used to send commands to a remote cluster.
If not found, a local Ray runtime is initialized on your machine instead.
To confirm that you are in distributed mode, run:
>>> print(f"Execution Engine: {wr.engine.get()}")
>>> print(f"Memory Format: {wr.memory_format.get()}")
which show that both Ray and Modin are enabled as an execution engine and memory format, respectively.
In distributed mode, the same awswrangler
APIs can now handle much larger datasets:
# Read Parquet data (1.2 Gb Parquet compressed)
df = wr.s3.read_parquet(
path=f"s3://amazon-reviews-pds/parquet/product_category={category.title()}/",
)
# Drop the customer_id column
df.drop("customer_id", axis=1, inplace=True)
# Filter reviews with 5-star rating
df5 = df[df["star_rating"] == 5]
In the example above, Amazon product data is read from Amazon S3 into a distributed Modin data frame. Modin is a drop-in replacement for Pandas. It exposes the same APIs but enables you to use all of the cores on your machine, or all of the workers in an entire cluster, leading to improved performance and scale. To use it, make sure to replace your pandas import statement with modin:
>>> import modin.pandas as pd # instead of import pandas as pd
Failing to do so means that all operations run on a single thread instead of leveraging the entire cluster resources.
Note that in distributed mode, all awswrangler
APIs return and accept Modin data frames, not pandas.
Supported APIs¶
This table lists the awswrangler
APIs available in distributed mode (i.e. that can run at scale):
Service |
API |
Implementation |
---|---|---|
|
|
✅ |
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
✅ |
|
|
|
✅ |
|
✅ |
|
|
|
✅ |
|
|
✅ |
|
✅ |
Caveats¶
S3FS Filesystem¶
When Ray is chosen as an engine, S3Fs is used instead of boto3 for certain API calls.
These include listing a large number of S3 objects for example.
This choice was made for performance reasons as a boto3 implementation can be much slower in some cases.
As a side effect,
users won’t be able to use the s3_additional_kwargs
input parameter as it’s currently not supported by S3Fs.
Unsupported kwargs¶
Most AWS SDK for pandas calls support passing the boto3_session
argument.
While this is acceptable for an application running in a single process,
distributed applications require the session to be serialized and passed to the worker nodes in the cluster.
This constitutes a security risk.
As a result, passing boto3_session
when using the Ray runtime is not supported.
To learn more¶
Read our blog post, then head to our latest tutorials to discover even more features.
A runbook with common errors when running the library with Ray is available here.
Tutorials¶
Note
You can also find all Tutorial Notebooks on GitHub.
1 - Introduction¶
What is AWS SDK for pandas?¶
An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon Timestream, Amazon EMR, etc).
Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.
Check our list of functionalities.
How to install?¶
awswrangler runs almost anywhere over Python 3.8, 3.9 and 3.10, so there are several different ways to install it in the desired environment.
Some good practices for most of the above methods are: - Use new and individual Virtual Environments for each project (venv) - On Notebooks, always restart your kernel after installations.
Let’s Install it!¶
[ ]:
!pip install awswrangler
Restart your kernel after the installation!
[1]:
import awswrangler as wr
wr.__version__
[1]:
'2.0.0'
2 - Sessions¶
How awswrangler handles Sessions and AWS credentials?¶
After version 1.0.0 awswrangler relies on Boto3.Session() to manage AWS credentials and configurations.
awswrangler will not store any kind of state internally. Users are in charge of managing Sessions.
Most awswrangler functions receive the optional boto3_session
argument. If None is received, the default boto3 Session will be used.
[1]:
import awswrangler as wr
import boto3
Using the default Boto3 Session¶
[2]:
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake")
[2]:
False
Customizing and using the default Boto3 Session¶
[3]:
boto3.setup_default_session(region_name="us-east-2")
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake")
[3]:
False
Using a new custom Boto3 Session¶
[4]:
my_session = boto3.Session(region_name="us-east-2")
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake", boto3_session=my_session)
[4]:
False
3 - Amazon S3¶
Table of Contents¶
[1]:
import awswrangler as wr
import pandas as pd
import boto3
import pytz
from datetime import datetime
df1 = pd.DataFrame({
"id": [1, 2],
"name": ["foo", "boo"]
})
df2 = pd.DataFrame({
"id": [3],
"name": ["bar"]
})
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
1. CSV files¶
1.1 Writing CSV files¶
[3]:
path1 = f"s3://{bucket}/csv/file1.csv"
path2 = f"s3://{bucket}/csv/file2.csv"
wr.s3.to_csv(df1, path1, index=False)
wr.s3.to_csv(df2, path2, index=False)
1.2 Reading single CSV file¶
[4]:
wr.s3.read_csv([path1])
[4]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
1.3 Reading multiple CSV files¶
1.3.1 Reading CSV by list¶
[5]:
wr.s3.read_csv([path1, path2])
[5]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
2 | 3 | bar |
1.3.2 Reading CSV by prefix¶
[6]:
wr.s3.read_csv(f"s3://{bucket}/csv/")
[6]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
2 | 3 | bar |
2. JSON files¶
2.1 Writing JSON files¶
[7]:
path1 = f"s3://{bucket}/json/file1.json"
path2 = f"s3://{bucket}/json/file2.json"
wr.s3.to_json(df1, path1)
wr.s3.to_json(df2, path2)
[7]:
['s3://woodadw-test/json/file2.json']
2.2 Reading single JSON file¶
[8]:
wr.s3.read_json([path1])
[8]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
2.3 Reading multiple JSON files¶
2.3.1 Reading JSON by list¶
[9]:
wr.s3.read_json([path1, path2])
[9]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
0 | 3 | bar |
2.3.2 Reading JSON by prefix¶
[10]:
wr.s3.read_json(f"s3://{bucket}/json/")
[10]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
0 | 3 | bar |
3. Parquet files¶
For more complex features releated to Parquet Dataset check the tutorial number 4.
3.1 Writing Parquet files¶
[11]:
path1 = f"s3://{bucket}/parquet/file1.parquet"
path2 = f"s3://{bucket}/parquet/file2.parquet"
wr.s3.to_parquet(df1, path1)
wr.s3.to_parquet(df2, path2)
3.2 Reading single Parquet file¶
[12]:
wr.s3.read_parquet([path1])
[12]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
3.3 Reading multiple Parquet files¶
3.3.1 Reading Parquet by list¶
[13]:
wr.s3.read_parquet([path1, path2])
[13]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
2 | 3 | bar |
3.3.2 Reading Parquet by prefix¶
[14]:
wr.s3.read_parquet(f"s3://{bucket}/parquet/")
[14]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
2 | 3 | bar |
4. Fixed-width formatted files (only read)¶
As of today, Pandas doesn’t implement a to_fwf
functionality, so let’s manually write two files:
[15]:
content = "1 Herfelingen 27-12-18\n"\
"2 Lambusart 14-06-18\n"\
"3 Spormaggiore 15-04-18"
boto3.client("s3").put_object(Body=content, Bucket=bucket, Key="fwf/file1.txt")
content = "4 Buizingen 05-09-19\n"\
"5 San Rafael 04-09-19"
boto3.client("s3").put_object(Body=content, Bucket=bucket, Key="fwf/file2.txt")
path1 = f"s3://{bucket}/fwf/file1.txt"
path2 = f"s3://{bucket}/fwf/file2.txt"
4.1 Reading single FWF file¶
[16]:
wr.s3.read_fwf([path1], names=["id", "name", "date"])
[16]:
id | name | date | |
---|---|---|---|
0 | 1 | Herfelingen | 27-12-18 |
1 | 2 | Lambusart | 14-06-18 |
2 | 3 | Spormaggiore | 15-04-18 |
4.2 Reading multiple FWF files¶
4.2.1 Reading FWF by list¶
[17]:
wr.s3.read_fwf([path1, path2], names=["id", "name", "date"])
[17]:
id | name | date | |
---|---|---|---|
0 | 1 | Herfelingen | 27-12-18 |
1 | 2 | Lambusart | 14-06-18 |
2 | 3 | Spormaggiore | 15-04-18 |
3 | 4 | Buizingen | 05-09-19 |
4 | 5 | San Rafael | 04-09-19 |
4.2.2 Reading FWF by prefix¶
[18]:
wr.s3.read_fwf(f"s3://{bucket}/fwf/", names=["id", "name", "date"])
[18]:
id | name | date | |
---|---|---|---|
0 | 1 | Herfelingen | 27-12-18 |
1 | 2 | Lambusart | 14-06-18 |
2 | 3 | Spormaggiore | 15-04-18 |
3 | 4 | Buizingen | 05-09-19 |
4 | 5 | San Rafael | 04-09-19 |
5. Excel files¶
5.1 Writing Excel file¶
[19]:
path = f"s3://{bucket}/file0.xlsx"
wr.s3.to_excel(df1, path, index=False)
[19]:
's3://woodadw-test/file0.xlsx'
5.2 Reading Excel file¶
[20]:
wr.s3.read_excel(path)
[20]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
6. Reading with lastModified filter¶
Specify the filter by LastModified Date.
The filter needs to be specified as datime with time zone
Internally the path needs to be listed, after that the filter is applied.
The filter compare the s3 content with the variables lastModified_begin and lastModified_end
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
6.1 Define the Date time with UTC Timezone¶
[21]:
begin = datetime.strptime("20-07-31 20:30", "%y-%m-%d %H:%M")
end = datetime.strptime("21-07-31 20:30", "%y-%m-%d %H:%M")
begin_utc = pytz.utc.localize(begin)
end_utc = pytz.utc.localize(end)
6.2 Define the Date time and specify the Timezone¶
[22]:
begin = datetime.strptime("20-07-31 20:30", "%y-%m-%d %H:%M")
end = datetime.strptime("21-07-31 20:30", "%y-%m-%d %H:%M")
timezone = pytz.timezone("America/Los_Angeles")
begin_Los_Angeles = timezone.localize(begin)
end_Los_Angeles = timezone.localize(end)
6.3 Read json using the LastModified filters¶
[23]:
wr.s3.read_fwf(f"s3://{bucket}/fwf/", names=["id", "name", "date"], last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_json(f"s3://{bucket}/json/", last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_csv(f"s3://{bucket}/csv/", last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_parquet(f"s3://{bucket}/parquet/", last_modified_begin=begin_utc, last_modified_end=end_utc)
7. Download objects¶
Objects can be downloaded from S3 using either a path to a local file or a file-like object in binary mode.
7.1 Download object to a file path¶
[24]:
local_file_dir = getpass.getpass()
[25]:
import os
path1 = f"s3://{bucket}/csv/file1.csv"
local_file = os.path.join(local_file_dir, "file1.csv")
wr.s3.download(path=path1, local_file=local_file)
pd.read_csv(local_file)
[25]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
7.2 Download object to a file-like object in binary mode¶
[26]:
path2 = f"s3://{bucket}/csv/file2.csv"
local_file = os.path.join(local_file_dir, "file2.csv")
with open(local_file, mode="wb") as local_f:
wr.s3.download(path=path2, local_file=local_f)
pd.read_csv(local_file)
[26]:
id | name | |
---|---|---|
0 | 3 | bar |
8. Upload objects¶
Objects can be uploaded to S3 using either a path to a local file or a file-like object in binary mode.
8.1 Upload object from a file path¶
[27]:
local_file = os.path.join(local_file_dir, "file1.csv")
wr.s3.upload(local_file=local_file, path=path1)
wr.s3.read_csv(path1)
[27]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
8.2 Upload object from a file-like object in binary mode¶
[28]:
local_file = os.path.join(local_file_dir, "file2.csv")
with open(local_file, "rb") as local_f:
wr.s3.upload(local_file=local_f, path=path2)
wr.s3.read_csv(path2)
[28]:
id | name | |
---|---|---|
0 | 3 | bar |
9. Delete objects¶
[29]:
wr.s3.delete_objects(f"s3://{bucket}/")
4 - Parquet Datasets¶
awswrangler has 3 different write modes to store Parquet Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files. If writing new files fails for any reason, old files are not restored.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Creating the Dataset¶
[3]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite"
)
wr.s3.read_parquet(path, dataset=True)
[3]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Appending¶
[4]:
df = pd.DataFrame({
"id": [3],
"value": ["bar"],
"date": [date(2020, 1, 3)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="append"
)
wr.s3.read_parquet(path, dataset=True)
[4]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
1 | 1 | foo | 2020-01-01 |
2 | 2 | boo | 2020-01-02 |
Overwriting¶
[5]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite"
)
wr.s3.read_parquet(path, dataset=True)
[5]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
Creating a Partitioned Dataset¶
[6]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["date"]
)
wr.s3.read_parquet(path, dataset=True)
[6]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Upserting partitions (overwrite_partitions)¶
[7]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite_partitions",
partition_cols=["date"]
)
wr.s3.read_parquet(path, dataset=True)
[7]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
BONUS - Glue/Athena integration¶
[8]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="aws_sdk_pandas",
table="my_table"
)
wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_sdk_pandas")
[8]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
5 - Glue Catalog¶
awswrangler makes heavy use of Glue Catalog to store metadata of tables and connections.
[1]:
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
············
Creating a Pandas DataFrame¶
[3]:
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["shoes", "tshirt", "ball"],
"price": [50.3, 10.5, 20.0],
"in_stock": [True, True, False]
})
df
[3]:
id | name | price | in_stock | |
---|---|---|---|---|
0 | 1 | shoes | 50.3 | True |
1 | 2 | tshirt | 10.5 | True |
2 | 3 | ball | 20.0 | False |
Checking Glue Catalog Databases¶
[4]:
databases = wr.catalog.databases()
print(databases)
Database Description
0 aws_sdk_pandas AWS SDK for pandas Test Arena - Glue Database
1 default Default Hive database
Create the database awswrangler_test if not exists¶
[5]:
if "awswrangler_test" not in databases.values:
wr.catalog.create_database("awswrangler_test")
print(wr.catalog.databases())
else:
print("Database awswrangler_test already exists")
Database Description
0 aws_sdk_pandas AWS SDK for pandas Test Arena - Glue Database
1 awswrangler_test
2 default Default Hive database
Checking the empty database¶
[6]:
wr.catalog.tables(database="awswrangler_test")
[6]:
Database | Table | Description | Columns | Partitions |
---|
Writing DataFrames to Data Lake (S3 + Parquet + Glue Catalog)¶
[7]:
desc = "This is my product table."
param = {
"source": "Product Web Service",
"class": "e-commerce"
}
comments = {
"id": "Unique product ID.",
"name": "Product name",
"price": "Product price (dollar)",
"in_stock": "Is this product availaible in the stock?"
}
res = wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/products/",
dataset=True,
database="awswrangler_test",
table="products",
mode="overwrite",
glue_table_settings=wr.typing.GlueTableSettings(
description=desc,
parameters=param,
columns_comments=comments
),
)
Checking Glue Catalog (AWS Console)¶

Looking Up for the new table!¶
[8]:
wr.catalog.tables(name_contains="roduc")
[8]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
[9]:
wr.catalog.tables(name_prefix="pro")
[9]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
[10]:
wr.catalog.tables(name_suffix="ts")
[10]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
[11]:
wr.catalog.tables(search_text="This is my")
[11]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | awswrangler_test | products | This is my product table. | id, name, price, in_stock |
Getting tables details¶
[12]:
wr.catalog.table(database="awswrangler_test", table="products")
[12]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | bigint | False | Unique product ID. |
1 | name | string | False | Product name |
2 | price | double | False | Product price (dollar) |
3 | in_stock | boolean | False | Is this product availaible in the stock? |
Cleaning Up the Database¶
[13]:
for table in wr.catalog.get_tables(database="awswrangler_test"):
wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])
Delete Database¶
[14]:
wr.catalog.delete_database('awswrangler_test')
6 - Amazon Athena¶
awswrangler has three ways to run queries on Athena and fetch the result as a DataFrame:
ctas_approach=True (Default)
Wraps the query with a CTAS and then reads the table data as parquet directly from s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
CONS
:Requires create/delete table permissions on Glue.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
A temporary table will be created and then deleted immediately.
Does not support custom data_source/catalog_id.
unload_approach=True and ctas_approach=False
Does an UNLOAD query on Athena and parse the Parquet result on s3.
PROS
:Faster for mid and big result sizes.
Can handle some level of nested types.
Does not modify Glue Data Catalog.
CONS
:Output S3 path must be empty.
Does not support timestamp with time zone
Does not support columns with repeated names.
Does not support columns with undefined data types.
ctas_approach=False
Does a regular query on Athena and parse the regular CSV result on s3.
PROS
:Faster for small result sizes (less latency).
Does not require create/delete table permissions on Glue
Supports timestamp with time zone.
Support custom data_source/catalog_id.
CONS
:Slower (But stills faster than other libraries that uses the regular Athena API)
Does not handle nested types at all.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[ ]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/by_year/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[ ]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
)
[ ]:
wr.catalog.table(database="awswrangler_test", table="noaa")
Reading with ctas_approach=False¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)
Default with ctas_approach=True - 13x faster (default)¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")
Using categories to speed up and save memory - 24x faster¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])
Reading with unload_approach=True¶
[ ]:
%%time
wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/")
Batching (Good for restricted memory environments)¶
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=True # Chunksize calculated automatically for ctas_approach.
)
for df in dfs: # Batching
print(len(df.index))
[ ]:
%%time
dfs = wr.athena.read_sql_query(
"SELECT * FROM noaa",
database="awswrangler_test",
chunksize=100_000_000
)
for df in dfs: # Batching
print(len(df.index))
Cleaning Up S3¶
[ ]:
wr.s3.delete_objects(path)
Delete table¶
[ ]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
Delete Database¶
[ ]:
wr.catalog.delete_database('awswrangler_test')
7 - Redshift, MySQL, PostgreSQL, SQL Server and Oracle¶
awswrangler’s Redshift, MySQL and PostgreSQL have two basic functions in common that try to follow Pandas conventions, but add more data type consistency.
[ ]:
# Install the optional modules first
!pip install 'awswrangler[redshift, postgres, mysql, sqlserver, oracle]'
[1]:
import awswrangler as wr
import pandas as pd
df = pd.DataFrame({
"id": [1, 2],
"name": ["foo", "boo"]
})
Connect using the Glue Catalog Connections¶
[2]:
con_redshift = wr.redshift.connect("aws-sdk-pandas-redshift")
con_mysql = wr.mysql.connect("aws-sdk-pandas-mysql")
con_postgresql = wr.postgresql.connect("aws-sdk-pandas-postgresql")
con_sqlserver = wr.sqlserver.connect("aws-sdk-pandas-sqlserver")
con_oracle = wr.oracle.connect("aws-sdk-pandas-oracle")
Raw SQL queries (No Pandas)¶
[3]:
with con_redshift.cursor() as cursor:
for row in cursor.execute("SELECT 1"):
print(row)
[1]
Loading data to Database¶
[4]:
wr.redshift.to_sql(df, con_redshift, schema="public", table="tutorial", mode="overwrite")
wr.mysql.to_sql(df, con_mysql, schema="test", table="tutorial", mode="overwrite")
wr.postgresql.to_sql(df, con_postgresql, schema="public", table="tutorial", mode="overwrite")
wr.sqlserver.to_sql(df, con_sqlserver, schema="dbo", table="tutorial", mode="overwrite")
wr.oracle.to_sql(df, con_oracle, schema="test", table="tutorial", mode="overwrite")
Unloading data from Database¶
[5]:
wr.redshift.read_sql_query("SELECT * FROM public.tutorial", con=con_redshift)
wr.mysql.read_sql_query("SELECT * FROM test.tutorial", con=con_mysql)
wr.postgresql.read_sql_query("SELECT * FROM public.tutorial", con=con_postgresql)
wr.sqlserver.read_sql_query("SELECT * FROM dbo.tutorial", con=con_sqlserver)
wr.oracle.read_sql_query("SELECT * FROM test.tutorial", con=con_oracle)
[5]:
id | name | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
[6]:
con_redshift.close()
con_mysql.close()
con_postgresql.close()
con_sqlserver.close()
con_oracle.close()
8 - Redshift - COPY & UNLOAD¶
Amazon Redshift
has two SQL command that help to load and unload large amount of data staging it on Amazon S3
:
1 - COPY
2 - UNLOAD
Let’s take a look and how awswrangler can use it.
[ ]:
# Install the optional modules first
!pip install 'awswrangler[redshift]'
[1]:
import awswrangler as wr
con = wr.redshift.connect("aws-sdk-pandas-redshift")
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/stage/"
···········································
Enter your IAM ROLE ARN:¶
[3]:
iam_role = getpass.getpass()
····················································································
Creating a DataFrame from the NOAA’s CSV files¶
[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/by_year/1897.csv",
names=cols,
parse_dates=["dt", "obs_time"]) # ~127MB, ~4MM rows
df
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AG000060590 | 1897-01-01 | TMAX | 170 | NaN | NaN | E | NaN |
1 | AG000060590 | 1897-01-01 | TMIN | -14 | NaN | NaN | E | NaN |
2 | AG000060590 | 1897-01-01 | PRCP | 0 | NaN | NaN | E | NaN |
3 | AGE00135039 | 1897-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00135039 | 1897-01-01 | TMIN | 40 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
3923594 | UZM00038457 | 1897-12-31 | TMIN | -145 | NaN | NaN | r | NaN |
3923595 | UZM00038457 | 1897-12-31 | PRCP | 4 | NaN | NaN | r | NaN |
3923596 | UZM00038457 | 1897-12-31 | TAVG | -95 | NaN | NaN | r | NaN |
3923597 | UZM00038618 | 1897-12-31 | PRCP | 66 | NaN | NaN | r | NaN |
3923598 | UZM00038618 | 1897-12-31 | TAVG | -45 | NaN | NaN | r | NaN |
3923599 rows × 8 columns
Load and Unload with COPY and UNLOAD commands¶
Note: Please use a empty S3 path for the COPY command.
[5]:
%%time
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="commands",
mode="overwrite",
iam_role=iam_role,
)
CPU times: user 2.78 s, sys: 293 ms, total: 3.08 s
Wall time: 20.7 s
[6]:
%%time
wr.redshift.unload(
sql="SELECT * FROM public.commands",
con=con,
iam_role=iam_role,
path=path,
keep_files=True,
)
CPU times: user 10 s, sys: 1.14 s, total: 11.2 s
Wall time: 27.5 s
[6]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AG000060590 | 1897-01-01 | TMAX | 170 | <NA> | <NA> | E | <NA> |
1 | AG000060590 | 1897-01-01 | PRCP | 0 | <NA> | <NA> | E | <NA> |
2 | AGE00135039 | 1897-01-01 | TMIN | 40 | <NA> | <NA> | E | <NA> |
3 | AGE00147705 | 1897-01-01 | TMAX | 164 | <NA> | <NA> | E | <NA> |
4 | AGE00147705 | 1897-01-01 | PRCP | 0 | <NA> | <NA> | E | <NA> |
... | ... | ... | ... | ... | ... | ... | ... | ... |
3923594 | USW00094967 | 1897-12-31 | TMAX | -144 | <NA> | <NA> | 6 | <NA> |
3923595 | USW00094967 | 1897-12-31 | PRCP | 0 | P | <NA> | 6 | <NA> |
3923596 | UZM00038457 | 1897-12-31 | TMAX | -49 | <NA> | <NA> | r | <NA> |
3923597 | UZM00038457 | 1897-12-31 | PRCP | 4 | <NA> | <NA> | r | <NA> |
3923598 | UZM00038618 | 1897-12-31 | PRCP | 66 | <NA> | <NA> | r | <NA> |
7847198 rows × 8 columns
[7]:
con.close()
9 - Redshift - Append, Overwrite and Upsert¶
awswrangler’s copy/to_sql
function has three different mode
options for Redshift.
1 - append
2 - overwrite
3 - upsert
[ ]:
# Install the optional modules first
!pip install 'awswrangler[redshift]'
[2]:
import awswrangler as wr
import pandas as pd
from datetime import date
con = wr.redshift.connect("aws-sdk-pandas-redshift")
Enter your bucket name:¶
[3]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/stage/"
···········································
Enter your IAM ROLE ARN:¶
[4]:
iam_role = getpass.getpass()
····················································································
Creating the table (Overwriting if it exists)¶
[10]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="my_table",
mode="overwrite",
iam_role=iam_role,
primary_keys=["id"]
)
wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[10]:
id | value | date | |
---|---|---|---|
0 | 2 | boo | 2020-01-02 |
1 | 1 | foo | 2020-01-01 |
Appending¶
[11]:
df = pd.DataFrame({
"id": [3],
"value": ["bar"],
"date": [date(2020, 1, 3)]
})
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="my_table",
mode="append",
iam_role=iam_role,
primary_keys=["id"]
)
wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[11]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
Upserting¶
[12]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
wr.redshift.copy(
df=df,
path=path,
con=con,
schema="public",
table="my_table",
mode="upsert",
iam_role=iam_role,
primary_keys=["id"]
)
wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[12]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
Cleaning Up¶
[13]:
with con.cursor() as cursor:
cursor.execute("DROP TABLE public.my_table")
con.close()
10 - Parquet Crawler¶
awswrangler can extract only the metadata from Parquet files and Partitions and then add it to the Glue Catalog.
[1]:
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
············
Creating a Parquet Table from the NOAA’s CSV files¶
[3]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/by_year/189",
names=cols,
parse_dates=["dt", "obs_time"]) # Read 10 files from the 1890 decade (~1GB)
df
[3]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN |
3 | AGE00147705 | 1890-01-01 | TMAX | 140 | NaN | NaN | E | NaN |
4 | AGE00147705 | 1890-01-01 | TMIN | 74 | NaN | NaN | E | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
29249753 | UZM00038457 | 1899-12-31 | PRCP | 16 | NaN | NaN | r | NaN |
29249754 | UZM00038457 | 1899-12-31 | TAVG | -73 | NaN | NaN | r | NaN |
29249755 | UZM00038618 | 1899-12-31 | TMIN | -76 | NaN | NaN | r | NaN |
29249756 | UZM00038618 | 1899-12-31 | PRCP | 0 | NaN | NaN | r | NaN |
29249757 | UZM00038618 | 1899-12-31 | TAVG | -60 | NaN | NaN | r | NaN |
29249758 rows × 8 columns
[4]:
df["year"] = df["dt"].dt.year
df.head(3)
[4]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | year | |
---|---|---|---|---|---|---|---|---|---|
0 | AGE00135039 | 1890-01-01 | TMAX | 160 | NaN | NaN | E | NaN | 1890 |
1 | AGE00135039 | 1890-01-01 | TMIN | 30 | NaN | NaN | E | NaN | 1890 |
2 | AGE00135039 | 1890-01-01 | PRCP | 45 | NaN | NaN | E | NaN | 1890 |
[5]:
res = wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["year"],
)
[6]:
[ x.split("data/", 1)[1] for x in wr.s3.list_objects(path)]
[6]:
['year=1890/06a519afcf8e48c9b08c8908f30adcfe.snappy.parquet',
'year=1891/5a99c28dbef54008bfc770c946099e02.snappy.parquet',
'year=1892/9b1ea5d1cfad40f78c920f93540ca8ec.snappy.parquet',
'year=1893/92259b49c134401eaf772506ee802af6.snappy.parquet',
'year=1894/c734469ffff944f69dc277c630064a16.snappy.parquet',
'year=1895/cf7ccde86aaf4d138f86c379c0817aa6.snappy.parquet',
'year=1896/ce02f4c2c554438786b766b33db451b6.snappy.parquet',
'year=1897/e04de04ad3c444deadcc9c410ab97ca1.snappy.parquet',
'year=1898/acb0e02878f04b56a6200f4b5a97be0e.snappy.parquet',
'year=1899/a269bdbb0f6a48faac55f3bcfef7df7a.snappy.parquet']
Crawling!¶
[7]:
%%time
res = wr.s3.store_parquet_metadata(
path=path,
database="awswrangler_test",
table="crawler",
dataset=True,
mode="overwrite",
dtype={"year": "int"}
)
CPU times: user 1.81 s, sys: 528 ms, total: 2.33 s
Wall time: 3.21 s
Checking¶
[8]:
wr.catalog.table(database="awswrangler_test", table="crawler")
[8]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | string | False | |
1 | dt | timestamp | False | |
2 | element | string | False | |
3 | value | bigint | False | |
4 | m_flag | string | False | |
5 | q_flag | string | False | |
6 | s_flag | string | False | |
7 | obs_time | string | False | |
8 | year | int | True |
[9]:
%%time
wr.athena.read_sql_query("SELECT * FROM crawler WHERE year=1890", database="awswrangler_test")
CPU times: user 3.52 s, sys: 811 ms, total: 4.33 s
Wall time: 9.6 s
[9]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | year | |
---|---|---|---|---|---|---|---|---|---|
0 | USC00195145 | 1890-01-01 | TMIN | -28 | <NA> | <NA> | 6 | <NA> | 1890 |
1 | USC00196770 | 1890-01-01 | PRCP | 0 | P | <NA> | 6 | <NA> | 1890 |
2 | USC00196770 | 1890-01-01 | SNOW | 0 | <NA> | <NA> | 6 | <NA> | 1890 |
3 | USC00196915 | 1890-01-01 | PRCP | 0 | P | <NA> | 6 | <NA> | 1890 |
4 | USC00196915 | 1890-01-01 | SNOW | 0 | <NA> | <NA> | 6 | <NA> | 1890 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
6139 | ASN00022006 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6140 | ASN00022007 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6141 | ASN00022008 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6142 | ASN00022009 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
6143 | ASN00022011 | 1890-12-03 | PRCP | 0 | <NA> | <NA> | a | <NA> | 1890 |
1276246 rows × 9 columns
Cleaning Up S3¶
[10]:
wr.s3.delete_objects(path)
Cleaning Up the Database¶
[11]:
for table in wr.catalog.get_tables(database="awswrangler_test"):
wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])
11 - CSV Datasets¶
awswrangler has 3 different write modes to store CSV Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating the Dataset¶
[4]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[4]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Appending¶
[5]:
df = pd.DataFrame({
"id": [3],
"value": ["bar"],
"date": [date(2020, 1, 3)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="append",
database="awswrangler_test",
table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[5]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
1 | 1 | foo | 2020-01-01 |
2 | 2 | boo | 2020-01-02 |
Overwriting¶
[6]:
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[6]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
Creating a Partitioned Dataset¶
[7]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset",
partition_cols=["date"]
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[7]:
id | value | date | |
---|---|---|---|
0 | 2 | boo | 2020-01-02 |
1 | 1 | foo | 2020-01-01 |
Upserting partitions (overwrite_partitions)¶
[8]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite_partitions",
database="awswrangler_test",
table="csv_dataset",
partition_cols=["date"]
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[8]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
0 | 3 | bar | 2020-01-03 |
BONUS - Glue/Athena integration¶
[9]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
index=False,
mode="overwrite",
database="aws_sdk_pandas",
table="my_table",
compression="gzip"
)
wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_sdk_pandas")
[9]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
12 - CSV Crawler¶
awswrangler can extract only the metadata from a Pandas DataFrame and then add it can be added to Glue Catalog as a table.
[1]:
import awswrangler as wr
from datetime import datetime
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/csv_crawler/"
············
Creating a Pandas DataFrame¶
[3]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") # noqa
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date() # noqa
df = pd.DataFrame(
{
"id": [1, 2, 3],
"string": ["foo", None, "boo"],
"float": [1.0, None, 2.0],
"date": [dt("2020-01-01"), None, dt("2020-01-02")],
"timestamp": [ts("2020-01-01 00:00:00.0"), None, ts("2020-01-02 00:00:01.0")],
"bool": [True, None, False],
"par0": [1, 1, 2],
"par1": ["a", "b", "b"],
}
)
df
[3]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | foo | 1.0 | 2020-01-01 | 2020-01-01 00:00:00 | True | 1 | a |
1 | 2 | None | NaN | None | NaT | None | 1 | b |
2 | 3 | boo | 2.0 | 2020-01-02 | 2020-01-02 00:00:01 | False | 2 | b |
Extracting the metadata¶
[4]:
columns_types, partitions_types = wr.catalog.extract_athena_types(
df=df,
file_format="csv",
index=False,
partition_cols=["par0", "par1"]
)
[5]:
columns_types
[5]:
{'id': 'bigint',
'string': 'string',
'float': 'double',
'date': 'date',
'timestamp': 'timestamp',
'bool': 'boolean'}
[6]:
partitions_types
[6]:
{'par0': 'bigint', 'par1': 'string'}
Creating the table¶
[7]:
wr.catalog.create_csv_table(
table="csv_crawler",
database="awswrangler_test",
path=path,
partitions_types=partitions_types,
columns_types=columns_types,
)
Checking¶
[8]:
wr.catalog.table(database="awswrangler_test", table="csv_crawler")
[8]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | bigint | False | |
1 | string | string | False | |
2 | float | double | False | |
3 | date | date | False | |
4 | timestamp | timestamp | False | |
5 | bool | boolean | False | |
6 | par0 | bigint | True | |
7 | par1 | string | True |
We can still using the extracted metadata to ensure all data types consistence to new data¶
[9]:
df = pd.DataFrame(
{
"id": [1],
"string": ["1"],
"float": [1],
"date": [ts("2020-01-01 00:00:00.0")],
"timestamp": [dt("2020-01-02")],
"bool": [1],
"par0": [1],
"par1": ["a"],
}
)
df
[9]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 1 | 2020-01-01 | 2020-01-02 | 1 | 1 | a |
[10]:
res = wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
database="awswrangler_test",
table="csv_crawler",
partition_cols=["par0", "par1"],
dtype=columns_types
)
You can also extract the metadata directly from the Catalog if you want¶
[11]:
dtype = wr.catalog.get_table_types(database="awswrangler_test", table="csv_crawler")
[12]:
res = wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
database="awswrangler_test",
table="csv_crawler",
partition_cols=["par0", "par1"],
dtype=dtype
)
Checking out¶
[13]:
df = wr.athena.read_sql_table(database="awswrangler_test", table="csv_crawler")
df
[13]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 1.0 | None | 2020-01-02 | True | 1 | a |
1 | 1 | 1 | 1.0 | None | 2020-01-02 | True | 1 | a |
[14]:
df.dtypes
[14]:
id Int64
string string
float float64
date object
timestamp datetime64[ns]
bool boolean
par0 Int64
par1 string
dtype: object
Cleaning Up S3¶
[15]:
wr.s3.delete_objects(path)
Cleaning Up the Database¶
[16]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="csv_crawler")
[16]:
True
13 - Merging Datasets on S3¶
awswrangler has 3 different copy modes to store Parquet Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path1 = f"s3://{bucket}/dataset1/"
path2 = f"s3://{bucket}/dataset2/"
············
Creating Dataset 1¶
[3]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
"date": [date(2020, 1, 1), date(2020, 1, 2)]
})
wr.s3.to_parquet(
df=df,
path=path1,
dataset=True,
mode="overwrite",
partition_cols=["date"]
)
wr.s3.read_parquet(path1, dataset=True)
[3]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Creating Dataset 2¶
[4]:
df = pd.DataFrame({
"id": [2, 3],
"value": ["xoo", "bar"],
"date": [date(2020, 1, 2), date(2020, 1, 3)]
})
dataset2_files = wr.s3.to_parquet(
df=df,
path=path2,
dataset=True,
mode="overwrite",
partition_cols=["date"]
)["paths"]
wr.s3.read_parquet(path2, dataset=True)
[4]:
id | value | date | |
---|---|---|---|
0 | 2 | xoo | 2020-01-02 |
1 | 3 | bar | 2020-01-03 |
Merging (Dataset 2 -> Dataset 1) (APPEND)¶
[5]:
wr.s3.merge_datasets(
source_path=path2,
target_path=path1,
mode="append"
)
wr.s3.read_parquet(path1, dataset=True)
[5]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 2 | boo | 2020-01-02 |
3 | 3 | bar | 2020-01-03 |
Merging (Dataset 2 -> Dataset 1) (OVERWRITE_PARTITIONS)¶
[6]:
wr.s3.merge_datasets(
source_path=path2,
target_path=path1,
mode="overwrite_partitions"
)
wr.s3.read_parquet(path1, dataset=True)
[6]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
2 | 3 | bar | 2020-01-03 |
Merging (Dataset 2 -> Dataset 1) (OVERWRITE)¶
[7]:
wr.s3.merge_datasets(
source_path=path2,
target_path=path1,
mode="overwrite"
)
wr.s3.read_parquet(path1, dataset=True)
[7]:
id | value | date | |
---|---|---|---|
0 | 2 | xoo | 2020-01-02 |
1 | 3 | bar | 2020-01-03 |
Cleaning Up¶
[8]:
wr.s3.delete_objects(path1)
wr.s3.delete_objects(path2)
14 - Schema Evolution¶
awswrangler supports new columns on Parquet and CSV datasets through:
wr.s3.store_parquet_metadata() i.e. “Crawler”
[1]:
from datetime import date
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
···········································
Creating the Dataset¶
Parquet Create¶
[3]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="aws_sdk_pandas",
table="my_table"
)
wr.s3.read_parquet(path, dataset=True)
[3]:
id | value | |
---|---|---|
0 | 1 | foo |
1 | 2 | boo |
CSV Create¶
[ ]:
df = pd.DataFrame({
"id": [1, 2],
"value": ["foo", "boo"],
})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="aws_sdk_pandas",
table="my_table"
)
wr.s3.read_csv(path, dataset=True)
Schema Version 0 on Glue Catalog (AWS Console)¶

Appending with NEW COLUMNS¶
Parquet Append¶
[4]:
df = pd.DataFrame({
"id": [3, 4],
"value": ["bar", None],
"date": [date(2020, 1, 3), date(2020, 1, 4)],
"flag": [True, False]
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="append",
database="aws_sdk_pandas",
table="my_table",
catalog_versioning=True # Optional
)
wr.s3.read_parquet(path, dataset=True, validate_schema=False)
[4]:
id | value | date | flag | |
---|---|---|---|---|
0 | 3 | bar | 2020-01-03 | True |
1 | 4 | None | 2020-01-04 | False |
2 | 1 | foo | NaN | NaN |
3 | 2 | boo | NaN | NaN |
CSV Append¶
Note: for CSV datasets due to column ordering, by default, schema evolution is disabled. Enable it by passing schema_evolution=True
flag
[ ]:
df = pd.DataFrame({
"id": [3, 4],
"value": ["bar", None],
"date": [date(2020, 1, 3), date(2020, 1, 4)],
"flag": [True, False]
})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
mode="append",
database="aws_sdk_pandas",
table="my_table",
schema_evolution=True,
catalog_versioning=True # Optional
)
wr.s3.read_csv(path, dataset=True, validate_schema=False)
Schema Version 1 on Glue Catalog (AWS Console)¶

Reading from Athena¶
[5]:
wr.athena.read_sql_table(table="my_table", database="aws_sdk_pandas")
[5]:
id | value | date | flag | |
---|---|---|---|---|
0 | 3 | bar | 2020-01-03 | True |
1 | 4 | None | 2020-01-04 | False |
2 | 1 | foo | None | <NA> |
3 | 2 | boo | None | <NA> |
Cleaning Up¶
[6]:
wr.s3.delete_objects(path)
wr.catalog.delete_table_if_exists(table="my_table", database="aws_sdk_pandas")
[6]:
True
15 - EMR¶
[1]:
import awswrangler as wr
import boto3
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
··········································
Enter your Subnet ID:¶
[8]:
subnet = getpass.getpass()
························
Creating EMR Cluster¶
[9]:
cluster_id = wr.emr.create_cluster(subnet)
Uploading our PySpark script to Amazon S3¶
[10]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext
print("Spark Initialized")
"""
_ = boto3.client("s3").put_object(
Body=script,
Bucket=bucket,
Key="test.py"
)
Submit PySpark step¶
[11]:
step_id = wr.emr.submit_step(cluster_id, command=f"spark-submit s3://{bucket}/test.py")
Wait Step¶
[12]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
Terminate Cluster¶
[13]:
wr.emr.terminate_cluster(cluster_id)
16 - EMR & Docker¶
[ ]:
import awswrangler as wr
import boto3
import getpass
Enter your bucket name:¶
[2]:
bucket = getpass.getpass()
··········································
Enter your Subnet ID:¶
[3]:
subnet = getpass.getpass()
························
Build and Upload Docker Image to ECR repository¶
Replace the {ACCOUNT_ID}
placeholder.
[ ]:
%%writefile Dockerfile
FROM amazoncorretto:8
RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development
RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv
RUN python -V
RUN python3 -V
ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3
RUN pip3 install --upgrade pip
RUN pip3 install awswrangler
RUN python3 -c "import awswrangler as wr"
[ ]:
%%bash
docker build -t 'local/emr-wrangler' .
aws ecr create-repository --repository-name emr-wrangler
docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
eval $(aws ecr get-login --region us-east-1 --no-include-email)
docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
Creating EMR Cluster¶
[4]:
cluster_id = wr.emr.create_cluster(subnet, docker=True)
Refresh ECR credentials in the cluster (expiration time: 12h )¶
[5]:
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/")
[5]:
's-1B0O45RWJL8CL'
Uploading application script to Amazon S3 (PySpark)¶
[7]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext
print("Spark Initialized")
import awswrangler as wr
print(f"awswrangler version: {wr.__version__}")
"""
boto3.client("s3").put_object(Body=script, Bucket=bucket, Key="test_docker.py")
Submit PySpark step¶
[8]:
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
step_id = wr.emr.submit_spark_step(
cluster_id,
f"s3://{bucket}/test_docker.py",
docker_image=DOCKER_IMAGE
)
Wait Step¶
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
Terminate Cluster¶
[ ]:
wr.emr.terminate_cluster(cluster_id)
Another example with custom configurations¶
[9]:
cluster_id = wr.emr.create_cluster(
cluster_name="my-demo-cluster-v2",
logging_s3_path=f"s3://{bucket}/emr-logs/",
emr_release="emr-6.7.0",
subnet_id=subnet,
emr_ec2_role="EMR_EC2_DefaultRole",
emr_role="EMR_DefaultRole",
instance_type_master="m5.2xlarge",
instance_type_core="m5.2xlarge",
instance_ebs_size_master=50,
instance_ebs_size_core=50,
instance_num_on_demand_master=0,
instance_num_on_demand_core=0,
instance_num_spot_master=1,
instance_num_spot_core=2,
spot_bid_percentage_of_on_demand_master=100,
spot_bid_percentage_of_on_demand_core=100,
spot_provisioning_timeout_master=5,
spot_provisioning_timeout_core=5,
spot_timeout_to_on_demand_master=False,
spot_timeout_to_on_demand_core=False,
python3=True,
docker=True,
spark_glue_catalog=True,
hive_glue_catalog=True,
presto_glue_catalog=True,
debugging=True,
applications=["Hadoop", "Spark", "Hive", "Zeppelin", "Livy"],
visible_to_all_users=True,
maximize_resource_allocation=True,
keep_cluster_alive_when_no_steps=True,
termination_protected=False,
spark_pyarrow=True
)
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/emr/")
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
step_id = wr.emr.submit_spark_step(
cluster_id,
f"s3://{bucket}/test_docker.py",
docker_image=DOCKER_IMAGE
)
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
wr.emr.terminate_cluster(cluster_id)
17 - Partition Projection¶
https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html
[1]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
import getpass
Enter your bucket name:¶
[2]:
bucket = getpass.getpass()
···········································
Integer projection¶
[3]:
df = pd.DataFrame({
"value": [1, 2, 3],
"year": [2019, 2020, 2021],
"month": [10, 11, 12],
"day": [25, 26, 27]
})
df
[3]:
value | year | month | day | |
---|---|---|---|---|
0 | 1 | 2019 | 10 | 25 |
1 | 2 | 2020 | 11 | 26 |
2 | 3 | 2021 | 12 | 27 |
[4]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_integer/",
dataset=True,
partition_cols=["year", "month", "day"],
database="default",
table="table_integer",
athena_partition_projection_settings={
"projection_types": {
"year": "integer",
"month": "integer",
"day": "integer"
},
"projection_ranges": {
"year": "2000,2025",
"month": "1,12",
"day": "1,31"
},
},
)
[5]:
wr.athena.read_sql_query(f"SELECT * FROM table_integer", database="default")
[5]:
value | year | month | day | |
---|---|---|---|---|
0 | 3 | 2021 | 12 | 27 |
1 | 2 | 2020 | 11 | 26 |
2 | 1 | 2019 | 10 | 25 |
Enum projection¶
[6]:
df = pd.DataFrame({
"value": [1, 2, 3],
"city": ["São Paulo", "Tokio", "Seattle"],
})
df
[6]:
value | city | |
---|---|---|
0 | 1 | São Paulo |
1 | 2 | Tokio |
2 | 3 | Seattle |
[7]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_enum/",
dataset=True,
partition_cols=["city"],
database="default",
table="table_enum",
athena_partition_projection_settings={
"projection_types": {
"city": "enum",
},
"projection_values": {
"city": "São Paulo,Tokio,Seattle"
},
},
)
[8]:
wr.athena.read_sql_query(f"SELECT * FROM table_enum", database="default")
[8]:
value | city | |
---|---|---|
0 | 1 | São Paulo |
1 | 3 | Seattle |
2 | 2 | Tokio |
Date projection¶
[9]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date()
df = pd.DataFrame({
"value": [1, 2, 3],
"dt": [dt("2020-01-01"), dt("2020-01-02"), dt("2020-01-03")],
"ts": [ts("2020-01-01 00:00:00"), ts("2020-01-01 00:00:01"), ts("2020-01-01 00:00:02")],
})
df
[9]:
value | dt | ts | |
---|---|---|---|
0 | 1 | 2020-01-01 | 2020-01-01 00:00:00 |
1 | 2 | 2020-01-02 | 2020-01-01 00:00:01 |
2 | 3 | 2020-01-03 | 2020-01-01 00:00:02 |
[10]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_date/",
dataset=True,
partition_cols=["dt", "ts"],
database="default",
table="table_date",
athena_partition_projection_settings={
"projection_types": {
"dt": "date",
"ts": "date",
},
"projection_ranges": {
"dt": "2020-01-01,2020-01-03",
"ts": "2020-01-01 00:00:00,2020-01-01 00:00:02"
},
},
)
[11]:
wr.athena.read_sql_query(f"SELECT * FROM table_date", database="default")
[11]:
value | dt | ts | |
---|---|---|---|
0 | 1 | 2020-01-01 | 2020-01-01 00:00:00 |
1 | 2 | 2020-01-02 | 2020-01-01 00:00:01 |
2 | 3 | 2020-01-03 | 2020-01-01 00:00:02 |
Injected projection¶
[12]:
df = pd.DataFrame({
"value": [1, 2, 3],
"uuid": ["761e2488-a078-11ea-bb37-0242ac130002", "b89ed095-8179-4635-9537-88592c0f6bc3", "87adc586-ce88-4f0a-b1c8-bf8e00d32249"],
})
df
[12]:
value | uuid | |
---|---|---|
0 | 1 | 761e2488-a078-11ea-bb37-0242ac130002 |
1 | 2 | b89ed095-8179-4635-9537-88592c0f6bc3 |
2 | 3 | 87adc586-ce88-4f0a-b1c8-bf8e00d32249 |
[13]:
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/table_injected/",
dataset=True,
partition_cols=["uuid"],
database="default",
table="table_injected",
athena_partition_projection_settings={
"projection_types": {
"uuid": "injected",
}
},
)
[14]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM table_injected WHERE uuid='b89ed095-8179-4635-9537-88592c0f6bc3'",
database="default"
)
[14]:
value | uuid | |
---|---|---|
0 | 2 | b89ed095-8179-4635-9537-88592c0f6bc3 |
Cleaning Up¶
[15]:
wr.s3.delete_objects(f"s3://{bucket}/table_integer/")
wr.s3.delete_objects(f"s3://{bucket}/table_enum/")
wr.s3.delete_objects(f"s3://{bucket}/table_date/")
wr.s3.delete_objects(f"s3://{bucket}/table_injected/")
[16]:
wr.catalog.delete_table_if_exists(table="table_integer", database="default")
wr.catalog.delete_table_if_exists(table="table_enum", database="default")
wr.catalog.delete_table_if_exists(table="table_date", database="default")
wr.catalog.delete_table_if_exists(table="table_injected", database="default")
[ ]:
18 - QuickSight¶
For this tutorial we will use the public AWS COVID-19 data lake.
References:
Please, install the CloudFormation template above to have access to the public data lake.
P.S. To be able to access the public data lake, you must allow explicitly QuickSight to access the related external bucket.
[1]:
import awswrangler as wr
from time import sleep
List users of QuickSight account
[2]:
[{"username": user["UserName"], "role": user["Role"]} for user in wr.quicksight.list_users('default')]
[2]:
[{'username': 'dev', 'role': 'ADMIN'}]
[3]:
wr.catalog.databases()
[3]:
Database | Description | |
---|---|---|
0 | aws_sdk_pandas | AWS SDK for pandas Test Arena - Glue Database |
1 | awswrangler_test | |
2 | covid-19 | |
3 | default | Default Hive database |
[4]:
wr.catalog.tables(database="covid-19")
[4]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | covid-19 | alleninstitute_comprehend_medical | Comprehend Medical results run against Allen I... | paper_id, date, dx_name, test_name, procedure_... | |
1 | covid-19 | alleninstitute_metadata | Metadata on papers pulled from the Allen Insti... | cord_uid, sha, source_x, title, doi, pmcid, pu... | |
2 | covid-19 | country_codes | Lookup table for country codes | country, alpha-2 code, alpha-3 code, numeric c... | |
3 | covid-19 | county_populations | Lookup table for population for each county ba... | id, id2, county, state, population estimate 2018 | |
4 | covid-19 | covid_knowledge_graph_edges | AWS Knowledge Graph for COVID-19 data | id, label, from, to, score | |
5 | covid-19 | covid_knowledge_graph_nodes_author | AWS Knowledge Graph for COVID-19 data | id, label, first, last, full_name | |
6 | covid-19 | covid_knowledge_graph_nodes_concept | AWS Knowledge Graph for COVID-19 data | id, label, entity, concept | |
7 | covid-19 | covid_knowledge_graph_nodes_institution | AWS Knowledge Graph for COVID-19 data | id, label, institution, country, settlement | |
8 | covid-19 | covid_knowledge_graph_nodes_paper | AWS Knowledge Graph for COVID-19 data | id, label, doi, sha_code, publish_time, source... | |
9 | covid-19 | covid_knowledge_graph_nodes_topic | AWS Knowledge Graph for COVID-19 data | id, label, topic, topic_num | |
10 | covid-19 | covid_testing_states_daily | USA total test daily trend by state. Sourced ... | date, state, positive, negative, pending, hosp... | |
11 | covid-19 | covid_testing_us_daily | USA total test daily trend. Sourced from covi... | date, states, positive, negative, posneg, pend... | |
12 | covid-19 | covid_testing_us_total | USA total tests. Sourced from covidtracking.c... | positive, negative, posneg, hospitalized, deat... | |
13 | covid-19 | covidcast_data | CMU Delphi's COVID-19 Surveillance Data | data_source, signal, geo_type, time_value, geo... | |
14 | covid-19 | covidcast_metadata | CMU Delphi's COVID-19 Surveillance Metadata | data_source, signal, time_type, geo_type, min_... | |
15 | covid-19 | enigma_jhu | Johns Hopkins University Consolidated data on ... | fips, admin2, province_state, country_region, ... | |
16 | covid-19 | enigma_jhu_timeseries | Johns Hopkins University data on COVID-19 case... | uid, fips, iso2, iso3, code3, admin2, latitude... | |
17 | covid-19 | hospital_beds | Data on hospital beds and their utilization in... | objectid, hospital_name, hospital_type, hq_add... | |
18 | covid-19 | nytimes_counties | Data on COVID-19 cases from NY Times at US cou... | date, county, state, fips, cases, deaths | |
19 | covid-19 | nytimes_states | Data on COVID-19 cases from NY Times at US sta... | date, state, fips, cases, deaths | |
20 | covid-19 | prediction_models_county_predictions | County-level Predictions Data. Sourced from Yu... | countyfips, countyname, statename, severity_co... | |
21 | covid-19 | prediction_models_severity_index | Severity Index models. Sourced from Yu Group a... | severity_1-day, severity_2-day, severity_3-day... | |
22 | covid-19 | tableau_covid_datahub | COVID-19 data that has been gathered and unifi... | country_short_name, country_alpha_3_code, coun... | |
23 | covid-19 | tableau_jhu | Johns Hopkins University data on COVID-19 case... | case_type, cases, difference, date, country_re... | |
24 | covid-19 | us_state_abbreviations | Lookup table for US state abbreviations | state, abbreviation | |
25 | covid-19 | world_cases_deaths_testing | Data on confirmed cases, deaths, and testing. ... | iso_code, location, date, total_cases, new_cas... |
Create data source of QuickSight Note: data source stores the connection information.
[5]:
wr.quicksight.create_athena_data_source(
name="covid-19",
workgroup="primary",
allowed_to_manage=["dev"]
)
[6]:
wr.catalog.tables(database="covid-19", name_contains="nyt")
[6]:
Database | Table | Description | Columns | Partitions | |
---|---|---|---|---|---|
0 | covid-19 | nytimes_counties | Data on COVID-19 cases from NY Times at US cou... | date, county, state, fips, cases, deaths | |
1 | covid-19 | nytimes_states | Data on COVID-19 cases from NY Times at US sta... | date, state, fips, cases, deaths |
[7]:
wr.athena.read_sql_query("SELECT * FROM nytimes_counties limit 10", database="covid-19", ctas_approach=False)
[7]:
date | county | state | fips | cases | deaths | |
---|---|---|---|---|---|---|
0 | 2020-01-21 | Snohomish | Washington | 53061 | 1 | 0 |
1 | 2020-01-22 | Snohomish | Washington | 53061 | 1 | 0 |
2 | 2020-01-23 | Snohomish | Washington | 53061 | 1 | 0 |
3 | 2020-01-24 | Cook | Illinois | 17031 | 1 | 0 |
4 | 2020-01-24 | Snohomish | Washington | 53061 | 1 | 0 |
5 | 2020-01-25 | Orange | California | 06059 | 1 | 0 |
6 | 2020-01-25 | Cook | Illinois | 17031 | 1 | 0 |
7 | 2020-01-25 | Snohomish | Washington | 53061 | 1 | 0 |
8 | 2020-01-26 | Maricopa | Arizona | 04013 | 1 | 0 |
9 | 2020-01-26 | Los Angeles | California | 06037 | 1 | 0 |
[8]:
sql = """
SELECT
j.*,
co.Population,
co.county AS county2,
hb.*
FROM
(
SELECT
date,
county,
state,
fips,
cases as confirmed,
deaths
FROM "covid-19".nytimes_counties
) j
LEFT OUTER JOIN (
SELECT
DISTINCT county,
state,
"population estimate 2018" AS Population
FROM
"covid-19".county_populations
WHERE
state IN (
SELECT
DISTINCT state
FROM
"covid-19".nytimes_counties
)
AND county IN (
SELECT
DISTINCT county as county
FROM "covid-19".nytimes_counties
)
) co ON co.county = j.county
AND co.state = j.state
LEFT OUTER JOIN (
SELECT
count(objectid) as Hospital,
fips as hospital_fips,
sum(num_licensed_beds) as licensed_beds,
sum(num_staffed_beds) as staffed_beds,
sum(num_icu_beds) as icu_beds,
avg(bed_utilization) as bed_utilization,
sum(
potential_increase_in_bed_capac
) as potential_increase_bed_capacity
FROM "covid-19".hospital_beds
WHERE
fips in (
SELECT
DISTINCT fips
FROM
"covid-19".nytimes_counties
)
GROUP BY
2
) hb ON hb.hospital_fips = j.fips
"""
wr.athena.read_sql_query(sql, database="covid-19", ctas_approach=False)
[8]:
date | county | state | fips | confirmed | deaths | population | county2 | Hospital | hospital_fips | licensed_beds | staffed_beds | icu_beds | bed_utilization | potential_increase_bed_capacity | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2020-04-12 | Park | Montana | 30067 | 7 | 0 | 16736 | Park | 0 | 30067 | 25 | 25 | 4 | 0.432548 | 0 |
1 | 2020-04-12 | Ravalli | Montana | 30081 | 3 | 0 | 43172 | Ravalli | 0 | 30081 | 25 | 25 | 5 | 0.567781 | 0 |
2 | 2020-04-12 | Silver Bow | Montana | 30093 | 11 | 0 | 34993 | Silver Bow | 0 | 30093 | 98 | 71 | 11 | 0.551457 | 27 |
3 | 2020-04-12 | Clay | Nebraska | 31035 | 2 | 0 | 6214 | Clay | <NA> | <NA> | <NA> | <NA> | <NA> | NaN | <NA> |
4 | 2020-04-12 | Cuming | Nebraska | 31039 | 2 | 0 | 8940 | Cuming | 0 | 31039 | 25 | 25 | 4 | 0.204493 | 0 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
227684 | 2020-06-11 | Hockley | Texas | 48219 | 28 | 1 | 22980 | Hockley | 0 | 48219 | 48 | 48 | 8 | 0.120605 | 0 |
227685 | 2020-06-11 | Hudspeth | Texas | 48229 | 11 | 0 | 4795 | Hudspeth | <NA> | <NA> | <NA> | <NA> | <NA> | NaN | <NA> |
227686 | 2020-06-11 | Jones | Texas | 48253 | 633 | 0 | 19817 | Jones | 0 | 48253 | 45 | 7 | 1 | 0.718591 | 38 |
227687 | 2020-06-11 | La Salle | Texas | 48283 | 4 | 0 | 7531 | La Salle | <NA> | <NA> | <NA> | <NA> | <NA> | NaN | <NA> |
227688 | 2020-06-11 | Limestone | Texas | 48293 | 36 | 1 | 23519 | Limestone | 0 | 48293 | 78 | 69 | 9 | 0.163940 | 9 |
227689 rows × 15 columns
Create Dataset with custom SQL option
[9]:
wr.quicksight.create_athena_dataset(
name="covid19-nytimes-usa",
sql=sql,
sql_name='CustomSQL',
data_source_name="covid-19",
import_mode='SPICE',
allowed_to_manage=["dev"]
)
[10]:
ingestion_id = wr.quicksight.create_ingestion("covid19-nytimes-usa")
Wait ingestion
[11]:
while wr.quicksight.describe_ingestion(ingestion_id=ingestion_id, dataset_name="covid19-nytimes-usa")["IngestionStatus"] not in ["COMPLETED", "FAILED"]:
sleep(1)
Describe last ingestion
[12]:
wr.quicksight.describe_ingestion(ingestion_id=ingestion_id, dataset_name="covid19-nytimes-usa")["RowInfo"]
[12]:
{'RowsIngested': 227689, 'RowsDropped': 0}
List all ingestions
[13]:
[{"time": user["CreatedTime"], "source": user["RequestSource"]} for user in wr.quicksight.list_ingestions("covid19-nytimes-usa")]
[13]:
[{'time': datetime.datetime(2020, 6, 12, 15, 13, 46, 996000, tzinfo=tzlocal()),
'source': 'MANUAL'},
{'time': datetime.datetime(2020, 6, 12, 15, 13, 42, 344000, tzinfo=tzlocal()),
'source': 'MANUAL'}]
Create new dataset from a table directly
[14]:
wr.quicksight.create_athena_dataset(
name="covid-19-tableau_jhu",
table="tableau_jhu",
data_source_name="covid-19",
database="covid-19",
import_mode='DIRECT_QUERY',
rename_columns={
"cases": "Count_of_Cases",
"combined_key": "County"
},
cast_columns_types={
"Count_of_Cases": "INTEGER"
},
tag_columns={
"combined_key": [{"ColumnGeographicRole": "COUNTY"}]
},
allowed_to_manage=["dev"]
)
Cleaning up
[15]:
wr.quicksight.delete_data_source("covid-19")
wr.quicksight.delete_dataset("covid19-nytimes-usa")
wr.quicksight.delete_dataset("covid-19-tableau_jhu")
19 - Amazon Athena Cache¶
awswrangler has a cache strategy that is disabled by default and can be enabled by passing max_cache_seconds
bigger than 0 as part of the athena_cache_settings
parameter. This cache strategy for Amazon Athena can help you to decrease query times and costs.
When calling read_sql_query
, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within max_cache_seconds
(a new parameter to read_sql_query
), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.
The detailed approach is: - When read_sql_query
is called with max_cache_seconds > 0
(it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination). - By default it will check the last 50 queries, but you can customize it through the max_cache_query_inspections
argument. - We then sort those queries based on CompletionDateTime, descending - For each of those queries, we check if their CompletionDateTime is still within the
max_cache_seconds
window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both ctas_approach
es). If they are the same, we check if the last one’s results are still on S3, and then return them instead of re-running the query. - During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual read_sql_query
path.
P.S. The ``cache scope is bounded for the current workgroup``, so you will be able to reuse queries results from others colleagues running in the same environment.
[18]:
import awswrangler as wr
Enter your bucket name:¶
[19]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
Checking/Creating Glue Catalog Databases¶
[20]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files¶
[21]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
df = wr.s3.read_csv(
path="s3://noaa-ghcn-pds/csv/by_year/1865.csv",
names=cols,
parse_dates=["dt", "obs_time"])
df
[21]:
id | dt | element | value | m_flag | q_flag | s_flag | obs_time | |
---|---|---|---|---|---|---|---|---|
0 | ID | DATE | ELEMENT | DATA_VALUE | M_FLAG | Q_FLAG | S_FLAG | OBS_TIME |
1 | AGE00135039 | 18650101 | PRCP | 0 | NaN | NaN | E | NaN |
2 | ASN00019036 | 18650101 | PRCP | 0 | NaN | NaN | a | NaN |
3 | ASN00021001 | 18650101 | PRCP | 0 | NaN | NaN | a | NaN |
4 | ASN00021010 | 18650101 | PRCP | 0 | NaN | NaN | a | NaN |
... | ... | ... | ... | ... | ... | ... | ... | ... |
37918 | USC00288878 | 18651231 | TMIN | -44 | NaN | NaN | 6 | NaN |
37919 | USC00288878 | 18651231 | PRCP | 0 | P | NaN | 6 | NaN |
37920 | USC00288878 | 18651231 | SNOW | 0 | P | NaN | 6 | NaN |
37921 | USC00361920 | 18651231 | PRCP | 0 | NaN | NaN | F | NaN |
37922 | USP00CA0001 | 18651231 | PRCP | 0 | NaN | NaN | F | NaN |
37923 rows × 8 columns
[ ]:
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="noaa"
)
[23]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[23]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | string | False | |
1 | dt | string | False | |
2 | element | string | False | |
3 | value | string | False | |
4 | m_flag | string | False | |
5 | q_flag | string | False | |
6 | s_flag | string | False | |
7 | obs_time | string | False |
The test query¶
The more computational resources the query needs, the more the cache will help you. That’s why we’re doing it using this long running query.
[24]:
query = """
SELECT
n1.element,
count(1) as cnt
FROM
noaa n1
JOIN
noaa n2
ON
n1.id = n2.id
GROUP BY
n1.element
"""
First execution…¶
[25]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test")
CPU times: user 1.59 s, sys: 166 ms, total: 1.75 s
Wall time: 5.62 s
[25]:
element | cnt | |
---|---|---|
0 | PRCP | 12044499 |
1 | MDTX | 1460 |
2 | DATX | 1460 |
3 | ELEMENT | 1 |
4 | WT01 | 22260 |
5 | WT03 | 840 |
6 | DATN | 1460 |
7 | DWPR | 490 |
8 | TMIN | 7012479 |
9 | MDTN | 1460 |
10 | MDPR | 2683 |
11 | SNOW | 1086762 |
12 | DAPR | 1330 |
13 | SNWD | 783532 |
14 | TMAX | 6533103 |
Second execution with CACHE (400x faster)¶
[26]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test", athena_cache_settings={"max_cache_seconds":900})
CPU times: user 689 ms, sys: 68.1 ms, total: 757 ms
Wall time: 1.11 s
[26]:
element | cnt | |
---|---|---|
0 | PRCP | 12044499 |
1 | MDTX | 1460 |
2 | DATX | 1460 |
3 | ELEMENT | 1 |
4 | WT01 | 22260 |
5 | WT03 | 840 |
6 | DATN | 1460 |
7 | DWPR | 490 |
8 | TMIN | 7012479 |
9 | MDTN | 1460 |
10 | MDPR | 2683 |
11 | SNOW | 1086762 |
12 | DAPR | 1330 |
13 | SNWD | 783532 |
14 | TMAX | 6533103 |
Allowing awswrangler to inspect up to 500 historical queries to find same result to reuse.¶
[27]:
%%time
wr.athena.read_sql_query(query, database="awswrangler_test", athena_cache_settings={"max_cache_seconds": 900, "max_cache_query_inspections": 500})
CPU times: user 715 ms, sys: 44.9 ms, total: 760 ms
Wall time: 1.03 s
[27]:
element | cnt | |
---|---|---|
0 | PRCP | 12044499 |
1 | MDTX | 1460 |
2 | DATX | 1460 |
3 | ELEMENT | 1 |
4 | WT01 | 22260 |
5 | WT03 | 840 |
6 | DATN | 1460 |
7 | DWPR | 490 |
8 | TMIN | 7012479 |
9 | MDTN | 1460 |
10 | MDPR | 2683 |
11 | SNOW | 1086762 |
12 | DAPR | 1330 |
13 | SNWD | 783532 |
14 | TMAX | 6533103 |
Cleaning Up S3¶
[28]:
wr.s3.delete_objects(path)
Delete table¶
[29]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
[29]:
True
Delete Database¶
[30]:
wr.catalog.delete_database('awswrangler_test')
20 - Spark Table Interoperability¶
awswrangler has no difficulty to insert, overwrite or do any other kind of interaction with a Table created by Apache Spark.
But if you want to do the opposite (Spark interacting with a table created by awswrangler) you should be aware that awswrangler follows the Hive’s format and you must be explicit when using the Spark’s saveAsTable
method:
[ ]:
spark_df.write.format("hive").saveAsTable("database.table")
Or just move forward using the insertInto
alternative:
[ ]:
spark_df.write.insertInto("database.table")
21 - Global Configurations¶
awswrangler has two ways to set global configurations that will override the regular default arguments configured in functions signatures.
Environment variables
wr.config
P.S. Check the function API doc to see if your function has some argument that can be configured through Global configurations.
P.P.S. One exception to the above mentioned rules is the ``botocore_config`` property. It cannot be set through environment variables but only via ``wr.config``. It will be used as the ``botocore.config.Config`` for all underlying ``boto3`` calls. The default config is ``botocore.config.Config(retries={“max_attempts”: 5}, connect_timeout=10, max_pool_connections=10)``. If you only want to change the retry behavior, you can use the environment variables ``AWS_MAX_ATTEMPTS`` and ``AWS_RETRY_MODE``. (see Boto3 documentation)
Environment Variables¶
[1]:
%env WR_DATABASE=default
%env WR_CTAS_APPROACH=False
%env WR_MAX_CACHE_SECONDS=900
%env WR_MAX_CACHE_QUERY_INSPECTIONS=500
%env WR_MAX_REMOTE_CACHE_ENTRIES=50
%env WR_MAX_LOCAL_CACHE_ENTRIES=100
env: WR_DATABASE=default
env: WR_CTAS_APPROACH=False
env: WR_MAX_CACHE_SECONDS=900
env: WR_MAX_CACHE_QUERY_INSPECTIONS=500
env: WR_MAX_REMOTE_CACHE_ENTRIES=50
env: WR_MAX_LOCAL_CACHE_ENTRIES=100
[2]:
import awswrangler as wr
import botocore
[3]:
wr.athena.read_sql_query("SELECT 1 AS FOO")
[3]:
foo | |
---|---|
0 | 1 |
Resetting¶
[4]:
# Specific
wr.config.reset("database")
# All
wr.config.reset()
wr.config¶
[5]:
wr.config.database = "default"
wr.config.ctas_approach = False
wr.config.max_cache_seconds = 900
wr.config.max_cache_query_inspections = 500
wr.config.max_remote_cache_entries = 50
wr.config.max_local_cache_entries = 100
# Set botocore.config.Config that will be used for all boto3 calls
wr.config.botocore_config = botocore.config.Config(
retries={"max_attempts": 10},
connect_timeout=20,
max_pool_connections=20
)
[6]:
wr.athena.read_sql_query("SELECT 1 AS FOO")
[6]:
foo | |
---|---|
0 | 1 |
Visualizing¶
[7]:
wr.config
[7]:
name | Env. Variable | type | nullable | enforced | configured | value | |
---|---|---|---|---|---|---|---|
0 | catalog_id | WR_CATALOG_ID | <class 'str'> | True | False | False | None |
1 | concurrent_partitioning | WR_CONCURRENT_PARTITIONING | <class 'bool'> | False | False | False | None |
2 | ctas_approach | WR_CTAS_APPROACH | <class 'bool'> | False | False | True | False |
3 | database | WR_DATABASE | <class 'str'> | True | False | True | default |
4 | max_cache_query_inspections | WR_MAX_CACHE_QUERY_INSPECTIONS | <class 'int'> | False | False | True | 500 |
5 | max_cache_seconds | WR_MAX_CACHE_SECONDS | <class 'int'> | False | False | True | 900 |
6 | max_remote_cache_entries | WR_MAX_REMOTE_CACHE_ENTRIES | <class 'int'> | False | False | True | 50 |
7 | max_local_cache_entries | WR_MAX_LOCAL_CACHE_ENTRIES | <class 'int'> | False | False | True | 100 |
8 | s3_block_size | WR_S3_BLOCK_SIZE | <class 'int'> | False | True | False | None |
9 | workgroup | WR_WORKGROUP | <class 'str'> | False | True | False | None |
10 | chunksize | WR_CHUNKSIZE | <class 'int'> | False | True | False | None |
11 | s3_endpoint_url | WR_S3_ENDPOINT_URL | <class 'str'> | True | True | True | None |
12 | athena_endpoint_url | WR_ATHENA_ENDPOINT_URL | <class 'str'> | True | True | True | None |
13 | sts_endpoint_url | WR_STS_ENDPOINT_URL | <class 'str'> | True | True | True | None |
14 | glue_endpoint_url | WR_GLUE_ENDPOINT_URL | <class 'str'> | True | True | True | None |
15 | redshift_endpoint_url | WR_REDSHIFT_ENDPOINT_URL | <class 'str'> | True | True | True | None |
16 | kms_endpoint_url | WR_KMS_ENDPOINT_URL | <class 'str'> | True | True | True | None |
17 | emr_endpoint_url | WR_EMR_ENDPOINT_URL | <class 'str'> | True | True | True | None |
18 | lakeformation_endpoint_url | WR_LAKEFORMATION_ENDPOINT_URL | <class 'str'> | True | True | True | None |
19 | dynamodb_endpoint_url | WR_DYNAMODB_ENDPOINT_URL | <class 'str'> | True | True | True | None |
20 | secretsmanager_endpoint_url | WR_SECRETSMANAGER_ENDPOINT_URL | <class 'str'> | True | True | True | None |
21 | timestream_endpoint_url | WR_TIMESTREAM_ENDPOINT_URL | <class 'str'> | True | True | True | None |
22 | botocore_config | WR_BOTOCORE_CONFIG | <class 'botocore.config.Config'> | True | False | True | <botocore.config.Config object at 0x14f313e50> |
23 | verify | WR_VERIFY | <class 'str'> | True | False | True | None |
24 | address | WR_ADDRESS | <class 'str'> | True | False | False | None |
25 | redis_password | WR_REDIS_PASSWORD | <class 'str'> | True | False | False | None |
26 | ignore_reinit_error | WR_IGNORE_REINIT_ERROR | <class 'bool'> | True | False | False | None |
27 | include_dashboard | WR_INCLUDE_DASHBOARD | <class 'bool'> | True | False | False | None |
28 | log_to_driver | WR_LOG_TO_DRIVER | <class 'bool'> | True | False | False | None |
29 | object_store_memory | WR_OBJECT_STORE_MEMORY | <class 'int'> | True | False | False | None |
30 | cpu_count | WR_CPU_COUNT | <class 'int'> | True | False | False | None |
31 | gpu_count | WR_GPU_COUNT | <class 'int'> | True | False | False | None |
[ ]:
22 - Writing Partitions Concurrently¶
concurrent_partitioning
argument:If True will increase the parallelism level during the partitions writing. It will decrease the writing time and increase memory usage.
P.S. Check the function API doc to see it has some argument that can be configured through Global configurations.
[1]:
%reload_ext memory_profiler
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
············
Reading 4 GB of CSV from NOAA’s historical data and creating a year column¶
[3]:
noaa_path = "s3://noaa-ghcn-pds/csv/by_year/193"
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
dates = ["dt", "obs_time"]
dtype = {x: "category" for x in ["element", "m_flag", "q_flag", "s_flag"]}
df = wr.s3.read_csv(noaa_path, names=cols, parse_dates=dates, dtype=dtype)
df["year"] = df["dt"].dt.year
print(f"Number of rows: {len(df.index)}")
print(f"Number of columns: {len(df.columns)}")
Number of rows: 125407761
Number of columns: 9
Default Writing¶
[4]:
%%time
%%memit
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["year"],
)
peak memory: 22169.04 MiB, increment: 11119.68 MiB
CPU times: user 49 s, sys: 12.5 s, total: 1min 1s
Wall time: 1min 11s
Concurrent Partitioning (Decreasing writing time, but increasing memory usage)¶
[5]:
%%time
%%memit
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["year"],
concurrent_partitioning=True # <-----
)
peak memory: 27819.48 MiB, increment: 15743.30 MiB
CPU times: user 52.3 s, sys: 13.6 s, total: 1min 5s
Wall time: 41.6 s
23 - Flexible Partitions Filter (PUSH-DOWN)¶
partition_filter
argument:- Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter). - This function MUST receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values. - This function MUST return a bool, True to read the partition or False to ignore it. - Ignored if `dataset=False`.
P.S. Check the function API doc to see it has some argument that can be configured through Global configurations.
[1]:
import awswrangler as wr
import pandas as pd
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Creating the Dataset (Parquet)¶
[3]:
df = pd.DataFrame({
"id": [1, 2, 3],
"value": ["foo", "boo", "bar"],
})
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["value"]
)
wr.s3.read_parquet(path, dataset=True)
[3]:
id | value | |
---|---|---|
0 | 3 | bar |
1 | 2 | boo |
2 | 1 | foo |
Parquet Example 1¶
[4]:
my_filter = lambda x: x["value"].endswith("oo")
wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[4]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
Parquet Example 2¶
[5]:
from Levenshtein import distance
def my_filter(partitions):
return distance("boo", partitions["value"]) <= 1
wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[5]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
Creating the Dataset (CSV)¶
[6]:
df = pd.DataFrame({
"id": [1, 2, 3],
"value": ["foo", "boo", "bar"],
})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
mode="overwrite",
partition_cols=["value"],
compression="gzip",
index=False
)
wr.s3.read_csv(path, dataset=True)
[6]:
id | value | |
---|---|---|
0 | 3 | bar |
1 | 2 | boo |
2 | 1 | foo |
CSV Example 1¶
[7]:
my_filter = lambda x: x["value"].endswith("oo")
wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[7]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
CSV Example 2¶
[8]:
from Levenshtein import distance
def my_filter(partitions):
return distance("boo", partitions["value"]) <= 1
wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[8]:
id | value | |
---|---|---|
0 | 2 | boo |
1 | 1 | foo |
24 - Athena Query Metadata¶
For wr.athena.read_sql_query()
and wr.athena.read_sql_table()
the resulting DataFrame (or every DataFrame in the returned Iterator for chunked queries) have a query_metadata
attribute, which brings the query result metadata returned by Boto3/Athena.
The expected query_metadata
format is the same returned by:
Environment Variables¶
[1]:
%env WR_DATABASE=default
env: WR_DATABASE=default
[2]:
import awswrangler as wr
[5]:
df = wr.athena.read_sql_query("SELECT 1 AS foo")
df
[5]:
foo | |
---|---|
0 | 1 |
Getting statistics from query metadata¶
[6]:
print(f'DataScannedInBytes: {df.query_metadata["Statistics"]["DataScannedInBytes"]}')
print(f'TotalExecutionTimeInMillis: {df.query_metadata["Statistics"]["TotalExecutionTimeInMillis"]}')
print(f'QueryQueueTimeInMillis: {df.query_metadata["Statistics"]["QueryQueueTimeInMillis"]}')
print(f'QueryPlanningTimeInMillis: {df.query_metadata["Statistics"]["QueryPlanningTimeInMillis"]}')
print(f'ServiceProcessingTimeInMillis: {df.query_metadata["Statistics"]["ServiceProcessingTimeInMillis"]}')
DataScannedInBytes: 0
TotalExecutionTimeInMillis: 2311
QueryQueueTimeInMillis: 121
QueryPlanningTimeInMillis: 250
ServiceProcessingTimeInMillis: 37
25 - Redshift - Loading Parquet files with Spectrum¶
Enter your bucket name:¶
[ ]:
# Install the optional modules first
!pip install 'awswrangler[redshift]'
[1]:
import getpass
bucket = getpass.getpass()
PATH = f"s3://{bucket}/files/"
···········································
Mocking some Parquet Files on S3¶
[2]:
import awswrangler as wr
import pandas as pd
df = pd.DataFrame({
"col0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
"col1": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"],
})
df
[2]:
col0 | col1 | |
---|---|---|
0 | 0 | a |
1 | 1 | b |
2 | 2 | c |
3 | 3 | d |
4 | 4 | e |
5 | 5 | f |
6 | 6 | g |
7 | 7 | h |
8 | 8 | i |
9 | 9 | j |
[3]:
wr.s3.to_parquet(df, PATH, max_rows_by_file=2, dataset=True, mode="overwrite")
Crawling the metadata and adding into Glue Catalog¶
[4]:
wr.s3.store_parquet_metadata(
path=PATH,
database="aws_sdk_pandas",
table="test",
dataset=True,
mode="overwrite"
)
[4]:
({'col0': 'bigint', 'col1': 'string'}, None, None)
Running the CTAS query to load the data into Redshift storage¶
[5]:
con = wr.redshift.connect(connection="aws-sdk-pandas-redshift")
[6]:
query = "CREATE TABLE public.test AS (SELECT * FROM aws_sdk_pandas_external.test)"
[7]:
with con.cursor() as cursor:
cursor.execute(query)
Running an INSERT INTO query to load MORE data into Redshift storage¶
[8]:
df = pd.DataFrame({
"col0": [10, 11],
"col1": ["k", "l"],
})
wr.s3.to_parquet(df, PATH, dataset=True, mode="overwrite")
[9]:
query = "INSERT INTO public.test (SELECT * FROM aws_sdk_pandas_external.test)"
[10]:
with con.cursor() as cursor:
cursor.execute(query)
Checking the result¶
[11]:
query = "SELECT * FROM public.test"
[13]:
wr.redshift.read_sql_table(con=con, schema="public", table="test")
[13]:
col0 | col1 | |
---|---|---|
0 | 5 | f |
1 | 1 | b |
2 | 3 | d |
3 | 6 | g |
4 | 8 | i |
5 | 10 | k |
6 | 4 | e |
7 | 0 | a |
8 | 2 | c |
9 | 7 | h |
10 | 9 | j |
11 | 11 | l |
[14]:
con.close()
26 - Amazon Timestream¶
Creating resources¶
[10]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1)
Write¶
[11]:
df = pd.DataFrame(
{
"time": [datetime.now(), datetime.now(), datetime.now()],
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.2],
}
)
rejected_records = wr.timestream.write(
df=df,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="measure",
dimensions_cols=["dim0", "dim1"],
)
print(f"Number of rejected records: {len(rejected_records)}")
Number of rejected records: 0
Query¶
[12]:
wr.timestream.query(
'SELECT time, measure_value::double, dim0, dim1 FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3'
)
[12]:
time | measure_value::double | dim0 | dim1 | |
---|---|---|---|---|
0 | 2020-12-08 19:15:32.468 | 1.0 | foo | 1 |
1 | 2020-12-08 19:15:32.468 | 1.2 | bar | 3 |
2 | 2020-12-08 19:15:32.468 | 1.1 | boo | 2 |
Deleting resources¶
[13]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")
27 - Amazon Timestream - Example 2¶
Reading test data¶
[1]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
df = pd.read_csv(
"https://raw.githubusercontent.com/aws/amazon-timestream-tools/master/sample_apps/data/sample.csv",
names=[
"ignore0",
"region",
"ignore1",
"az",
"ignore2",
"hostname",
"measure_kind",
"measure",
"ignore3",
"ignore4",
"ignore5",
],
usecols=["region", "az", "hostname", "measure_kind", "measure"],
)
df["time"] = datetime.now()
df.reset_index(inplace=True, drop=False)
df
[1]:
index | region | az | hostname | measure_kind | measure | time | |
---|---|---|---|---|---|---|---|
0 | 0 | us-east-1 | us-east-1a | host-fj2hx | cpu_utilization | 21.394363 | 2020-12-08 16:18:47.599597 |
1 | 1 | us-east-1 | us-east-1a | host-fj2hx | memory_utilization | 68.563420 | 2020-12-08 16:18:47.599597 |
2 | 2 | us-east-1 | us-east-1a | host-6kMPE | cpu_utilization | 17.144579 | 2020-12-08 16:18:47.599597 |
3 | 3 | us-east-1 | us-east-1a | host-6kMPE | memory_utilization | 73.507870 | 2020-12-08 16:18:47.599597 |
4 | 4 | us-east-1 | us-east-1a | host-sxj7X | cpu_utilization | 26.584865 | 2020-12-08 16:18:47.599597 |
... | ... | ... | ... | ... | ... | ... | ... |
125995 | 125995 | eu-north-1 | eu-north-1c | host-De8RB | memory_utilization | 68.063468 | 2020-12-08 16:18:47.599597 |
125996 | 125996 | eu-north-1 | eu-north-1c | host-2z8tn | memory_utilization | 72.203680 | 2020-12-08 16:18:47.599597 |
125997 | 125997 | eu-north-1 | eu-north-1c | host-2z8tn | cpu_utilization | 29.212219 | 2020-12-08 16:18:47.599597 |
125998 | 125998 | eu-north-1 | eu-north-1c | host-9FczW | memory_utilization | 71.746134 | 2020-12-08 16:18:47.599597 |
125999 | 125999 | eu-north-1 | eu-north-1c | host-9FczW | cpu_utilization | 1.677793 | 2020-12-08 16:18:47.599597 |
126000 rows × 7 columns
Creating resources¶
[2]:
wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1)
Write CPU_UTILIZATION records¶
[3]:
df_cpu = df[df.measure_kind == "cpu_utilization"].copy()
df_cpu.rename(columns={"measure": "cpu_utilization"}, inplace=True)
df_cpu
[3]:
index | region | az | hostname | measure_kind | cpu_utilization | time | |
---|---|---|---|---|---|---|---|
0 | 0 | us-east-1 | us-east-1a | host-fj2hx | cpu_utilization | 21.394363 | 2020-12-08 16:18:47.599597 |
2 | 2 | us-east-1 | us-east-1a | host-6kMPE | cpu_utilization | 17.144579 | 2020-12-08 16:18:47.599597 |
4 | 4 | us-east-1 | us-east-1a | host-sxj7X | cpu_utilization | 26.584865 | 2020-12-08 16:18:47.599597 |
6 | 6 | us-east-1 | us-east-1a | host-ExOui | cpu_utilization | 52.930970 | 2020-12-08 16:18:47.599597 |
8 | 8 | us-east-1 | us-east-1a | host-Bwb3j | cpu_utilization | 99.134110 | 2020-12-08 16:18:47.599597 |
... | ... | ... | ... | ... | ... | ... | ... |
125990 | 125990 | eu-north-1 | eu-north-1c | host-aPtc6 | cpu_utilization | 89.566125 | 2020-12-08 16:18:47.599597 |
125992 | 125992 | eu-north-1 | eu-north-1c | host-7ZF9L | cpu_utilization | 75.510598 | 2020-12-08 16:18:47.599597 |
125994 | 125994 | eu-north-1 | eu-north-1c | host-De8RB | cpu_utilization | 2.771261 | 2020-12-08 16:18:47.599597 |
125997 | 125997 | eu-north-1 | eu-north-1c | host-2z8tn | cpu_utilization | 29.212219 | 2020-12-08 16:18:47.599597 |
125999 | 125999 | eu-north-1 | eu-north-1c | host-9FczW | cpu_utilization | 1.677793 | 2020-12-08 16:18:47.599597 |
63000 rows × 7 columns
[4]:
rejected_records = wr.timestream.write(
df=df_cpu,
database="sampleDB",
table="sampleTable",
time_col="time",
measure_col="cpu_utilization",
dimensions_cols=["index", "region", "az", "hostname"],
)
assert len(rejected_records) == 0
Batch Load MEMORY_UTILIZATION records¶
[5]:
df_memory = df[df.measure_kind == "memory_utilization"].copy()
df_memory.rename(columns={"measure": "memory_utilization"}, inplace=True)
df_memory
[5]:
index | region | az | hostname | measure_kind | memory_utilization | time | |
---|---|---|---|---|---|---|---|
1 | 1 | us-east-1 | us-east-1a | host-fj2hx | memory_utilization | 68.563420 | 2020-12-08 16:18:47.599597 |
3 | 3 | us-east-1 | us-east-1a | host-6kMPE | memory_utilization | 73.507870 | 2020-12-08 16:18:47.599597 |
5 | 5 | us-east-1 | us-east-1a | host-sxj7X | memory_utilization | 22.401424 | 2020-12-08 16:18:47.599597 |
7 | 7 | us-east-1 | us-east-1a | host-ExOui | memory_utilization | 45.440135 | 2020-12-08 16:18:47.599597 |
9 | 9 | us-east-1 | us-east-1a | host-Bwb3j | memory_utilization | 15.042701 | 2020-12-08 16:18:47.599597 |
... | ... | ... | ... | ... | ... | ... | ... |
125991 | 125991 | eu-north-1 | eu-north-1c | host-aPtc6 | memory_utilization | 75.686739 | 2020-12-08 16:18:47.599597 |
125993 | 125993 | eu-north-1 | eu-north-1c | host-7ZF9L | memory_utilization | 18.386152 | 2020-12-08 16:18:47.599597 |
125995 | 125995 | eu-north-1 | eu-north-1c | host-De8RB | memory_utilization | 68.063468 | 2020-12-08 16:18:47.599597 |
125996 | 125996 | eu-north-1 | eu-north-1c | host-2z8tn | memory_utilization | 72.203680 | 2020-12-08 16:18:47.599597 |
125998 | 125998 | eu-north-1 | eu-north-1c | host-9FczW | memory_utilization | 71.746134 | 2020-12-08 16:18:47.599597 |
63000 rows × 7 columns
[6]:
response = wr.timestream.batch_load(
df=df_memory,
path="s3://bucket/prefix/",
database="sampleDB",
table="sampleTable",
time_col="time",
measure_cols=["memory_utilization"],
dimensions_cols=["index", "region", "az", "hostname"],
measure_cols=["memory_utilization"],
measure_name_col="measure_kind",
report_s3_configuration={"BucketName": "error_bucket", "ObjectKeyPrefix": "error_prefix"},
)
assert response["BatchLoadTaskDescription"]["ProgressReport"]["RecordIngestionFailures"] == 0
Querying CPU_UTILIZATION¶
[7]:
wr.timestream.query("""
SELECT
hostname, region, az, measure_name, measure_value::double, time
FROM "sampleDB"."sampleTable"
WHERE measure_name = 'cpu_utilization'
ORDER BY time DESC
LIMIT 10
""")
[7]:
hostname | region | az | measure_name | measure_value::double | time | |
---|---|---|---|---|---|---|
0 | host-OgvFx | us-west-1 | us-west-1a | cpu_utilization | 39.617911 | 2020-12-08 19:18:47.600 |
1 | host-rZUNx | eu-north-1 | eu-north-1a | cpu_utilization | 30.793332 | 2020-12-08 19:18:47.600 |
2 | host-t1kAB | us-east-2 | us-east-2b | cpu_utilization | 74.453239 | 2020-12-08 19:18:47.600 |
3 | host-RdQRf | us-east-1 | us-east-1c | cpu_utilization | 76.984448 | 2020-12-08 19:18:47.600 |
4 | host-4Llhu | us-east-1 | us-east-1c | cpu_utilization | 41.862733 | 2020-12-08 19:18:47.600 |
5 | host-2plqa | us-west-1 | us-west-1a | cpu_utilization | 34.864762 | 2020-12-08 19:18:47.600 |
6 | host-J3Q4z | us-east-1 | us-east-1b | cpu_utilization | 71.574266 | 2020-12-08 19:18:47.600 |
7 | host-VIR5T | ap-east-1 | ap-east-1a | cpu_utilization | 14.017491 | 2020-12-08 19:18:47.600 |
8 | host-G042D | us-east-1 | us-east-1c | cpu_utilization | 60.199068 | 2020-12-08 19:18:47.600 |
9 | host-8EBHm | us-west-2 | us-west-2c | cpu_utilization | 96.631624 | 2020-12-08 19:18:47.600 |
Querying MEMORY_UTILIZATION¶
[8]:
wr.timestream.query("""
SELECT
hostname, region, az, measure_name, measure_value::double, time
FROM "sampleDB"."sampleTable"
WHERE measure_name = 'memory_utilization'
ORDER BY time DESC
LIMIT 10
""")
[8]:
hostname | region | az | measure_name | measure_value::double | time | |
---|---|---|---|---|---|---|
0 | host-7c897 | us-west-2 | us-west-2b | memory_utilization | 63.427726 | 2020-12-08 19:18:47.600 |
1 | host-2z8tn | eu-north-1 | eu-north-1c | memory_utilization | 41.071368 | 2020-12-08 19:18:47.600 |
2 | host-J3Q4z | us-east-1 | us-east-1b | memory_utilization | 23.944388 | 2020-12-08 19:18:47.600 |
3 | host-mjrQb | us-east-1 | us-east-1b | memory_utilization | 69.173431 | 2020-12-08 19:18:47.600 |
4 | host-AyWSI | us-east-1 | us-east-1c | memory_utilization | 75.591467 | 2020-12-08 19:18:47.600 |
5 | host-Axf0g | us-west-2 | us-west-2a | memory_utilization | 29.720739 | 2020-12-08 19:18:47.600 |
6 | host-ilMBa | us-east-2 | us-east-2b | memory_utilization | 71.544134 | 2020-12-08 19:18:47.600 |
7 | host-CWdXX | us-west-2 | us-west-2c | memory_utilization | 79.792799 | 2020-12-08 19:18:47.600 |
8 | host-8EBHm | us-west-2 | us-west-2c | memory_utilization | 66.082554 | 2020-12-08 19:18:47.600 |
9 | host-dRIJj | us-east-1 | us-east-1c | memory_utilization | 86.748960 | 2020-12-08 19:18:47.600 |
Deleting resources¶
[9]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")
28 - Amazon DynamoDB¶
Writing Data¶
[23]:
from datetime import datetime
from decimal import Decimal
from pathlib import Path
import awswrangler as wr
import pandas as pd
from boto3.dynamodb.conditions import Attr, Key
Writing DataFrame¶
[27]:
table_name = "movies"
df = pd.DataFrame({
"title": ["Titanic", "Snatch", "The Godfather"],
"year": [1997, 2000, 1972],
"genre": ["drama", "caper story", "crime"],
})
wr.dynamodb.put_df(df=df, table_name=table_name)
Writing CSV file¶
[3]:
filepath = Path("items.csv")
df.to_csv(filepath, index=False)
wr.dynamodb.put_csv(path=filepath, table_name=table_name)
filepath.unlink()
Writing JSON files¶
[4]:
filepath = Path("items.json")
df.to_json(filepath, orient="records")
wr.dynamodb.put_json(path="items.json", table_name=table_name)
filepath.unlink()
Writing list of items¶
[5]:
items = df.to_dict(orient="records")
wr.dynamodb.put_items(items=items, table_name=table_name)
Reading Data¶
Read Items¶
[ ]:
# Limit Read to 5 items
wr.dynamodb.read_items(table_name=table_name, max_items_evaluated=5)
# Limit Read to Key expression
wr.dynamodb.read_items(
table_name=table_name,
key_condition_expression=(Key("title").eq("Snatch") & Key("year").eq(2000))
)
Read PartiQL¶
[29]:
wr.dynamodb.read_partiql_query(
query=f"SELECT * FROM {table_name} WHERE title=? AND year=?",
parameters=["Snatch", 2000],
)
[29]:
year | genre | title | |
---|---|---|---|
0 | 2000 | caper story | Snatch |
Executing statements¶
[29]:
title = "The Lord of the Rings: The Fellowship of the Ring"
year = datetime.now().year
genre = "epic"
rating = Decimal('9.9')
plot = "The fate of Middle-earth hangs in the balance as Frodo and eight companions begin their journey to Mount Doom in the land of Mordor."
# Insert items
wr.dynamodb.execute_statement(
statement=f"INSERT INTO {table_name} VALUE {{'title': ?, 'year': ?, 'genre': ?, 'info': ?}}",
parameters=[title, year, genre, {"plot": plot, "rating": rating}],
)
# Select items
wr.dynamodb.execute_statement(
statement=f"SELECT * FROM \"{table_name}\" WHERE title=? AND year=?",
parameters=[title, year],
)
# Update items
wr.dynamodb.execute_statement(
statement=f"UPDATE \"{table_name}\" SET info.rating=? WHERE title=? AND year=?",
parameters=[Decimal(10), title, year],
)
# Delete items
wr.dynamodb.execute_statement(
statement=f"DELETE FROM \"{table_name}\" WHERE title=? AND year=?",
parameters=[title, year],
)
[29]:
[]
Deleting items¶
[6]:
wr.dynamodb.delete_items(items=items, table_name="table")
29 - S3 Select¶
AWS SDK for pandas supports Amazon S3 Select, enabling applications to use SQL statements in order to query and filter the contents of a single S3 object. It works on objects stored in CSV, JSON or Apache Parquet, including compressed and large files of several TBs.
With S3 Select, the query workload is delegated to Amazon S3, leading to lower latency and cost, and to higher performance (up to 400% improvement). This is in comparison with other awswrangler operations such as read_parquet
where the S3 object is downloaded and filtered on the client-side.
This feature has a number of limitations however:
The maximum length of a record in the input or result is 1 MB
The maximum uncompressed row group size is 256 MB (Parquet only)
It can only emit nested data in JSON format
Certain SQL operations are not supported (e.g. ORDER BY)
Read multiple Parquet files from an S3 prefix¶
[1]:
import awswrangler as wr
df = wr.s3.select_query(
sql="SELECT * FROM s3object s where s.\"star_rating\" >= 5",
path="s3://amazon-reviews-pds/parquet/product_category=Gift_Card/",
input_serialization="Parquet",
input_serialization_params={},
)
df.loc[:, df.columns != "product_title"].head()
[1]:
marketplace | customer_id | review_id | product_id | product_parent | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 52670295 | RGPOFKORD8RTU | B0002CZPPG | 867256265 | 5 | 105 | 107 | N | N | Excellent Gift Idea | I wonder if the other reviewer actually read t... | 2005-02-08 | 2005 |
1 | US | 29964102 | R2U8X8V5KPB4J3 | B00H5BMF00 | 373287760 | 5 | 0 | 0 | N | Y | Five Stars | convenience is the name of the game. | 2015-05-03 | 2015 |
2 | US | 25173351 | R15XV3LXUMLTXL | B00PG40CO4 | 137115061 | 5 | 0 | 0 | N | Y | Birthday Gift | This gift card was handled with accuracy in de... | 2015-05-03 | 2015 |
3 | US | 12516181 | R3G6G7H8TX4H0T | B0002CZPPG | 867256265 | 5 | 6 | 6 | N | N | Love 'em. | Gotta love these iTunes Prepaid Card thingys. ... | 2005-10-15 | 2005 |
4 | US | 38355314 | R2NJ7WNBU16YTQ | B00B2TFSO6 | 89375983 | 5 | 0 | 0 | N | Y | Five Stars | perfect | 2015-05-03 | 2015 |
Read full CSV file¶
[5]:
df = wr.s3.select_query(
sql="SELECT * FROM s3object",
path="s3://humor-detection-pds/Humorous.csv",
input_serialization="CSV",
input_serialization_params={
"FileHeaderInfo": "Use",
"RecordDelimiter": "\r\n",
},
scan_range_chunk_size=1024*1024*32, # override range of bytes to query, by default 1Mb
use_threads=True,
)
df.head()
[5]:
question | product_description | image_url | label | |
---|---|---|---|---|
0 | Will the volca sample get me a girlfriend? | Korg Amplifier Part VOLCASAMPLE | http://ecx.images-amazon.com/images/I/81I1XZea... | 1 |
1 | Can u communicate with spirits even on Saturday? | Winning Moves Games Classic Ouija | http://ecx.images-amazon.com/images/I/81kcYEG5... | 1 |
2 | I won't get hunted right? | Winning Moves Games Classic Ouija | http://ecx.images-amazon.com/images/I/81kcYEG5... | 1 |
3 | I have a few questions.. Can you get possessed... | Winning Moves Games Classic Ouija | http://ecx.images-amazon.com/images/I/81kcYEG5... | 1 |
4 | Has anyone asked where the treasure is? What w... | Winning Moves Games Classic Ouija | http://ecx.images-amazon.com/images/I/81kcYEG5... | 1 |
Filter JSON file¶
[3]:
wr.s3.select_query(
sql="SELECT * FROM s3object[*] s where s.\"family_name\" = \'Biden\'",
path="s3://awsglue-datasets/examples/us-legislators/all/persons.json",
input_serialization="JSON",
input_serialization_params={
"Type": "Document",
},
)
[3]:
family_name | contact_details | name | links | gender | image | identifiers | other_names | sort_name | images | given_name | birth_date | id | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | Biden | [{'type': 'twitter', 'value': 'joebiden'}] | Joseph Biden, Jr. | [{'note': 'Wikipedia (ace)', 'url': 'https://a... | male | https://theunitedstates.io/images/congress/ori... | [{'identifier': 'B000444', 'scheme': 'bioguide... | [{'lang': None, 'name': 'Joe Biden', 'note': '... | Biden, Joseph | [{'url': 'https://theunitedstates.io/images/co... | Joseph | 1942-11-20 | 64239edf-8e06-4d2d-acc0-33d96bc79774 |
30 - Data Api¶
The Data Api simplifies access to Amazon Redshift and RDS by removing the need to manage database connections and credentials. Instead, you can execute SQL commands to an Amazon Redshift cluster or Amazon Aurora cluster by simply invoking an HTTPS API endpoint provided by the Data API. It takes care of managing database connections and returning data. Since the Data API leverages IAM user credentials or database credentials stored in AWS Secrets Manager, you don’t need to pass credentials in API calls.
Connect to the cluster¶
[ ]:
con_redshift = wr.data_api.redshift.connect(
cluster_id="aws-sdk-pandas-1xn5lqxrdxrv3",
database="test_redshift",
secret_arn="arn:aws:secretsmanager:us-east-1:111111111111:secret:aws-sdk-pandas/redshift-ewn43d"
)
con_redshift_serverless = wr.data_api.redshift.connect(
workgroup_name="aws-sdk-pandas",
database="test_redshift",
secret_arn="arn:aws:secretsmanager:us-east-1:111111111111:secret:aws-sdk-pandas/redshift-f3en4w"
)
con_mysql = wr.data_api.rds.connect(
resource_arn="arn:aws:rds:us-east-1:111111111111:cluster:mysql-serverless-cluster-wrangler",
database="test_rds",
secret_arn="arn:aws:secretsmanager:us-east-1:111111111111:secret:aws-sdk-pandas/mysql-23df3"
)
Read from database¶
[ ]:
df = wr.data_api.redshift.read_sql_query(
sql="SELECT * FROM public.test_table",
con=con_redshift,
)
df = wr.data_api.rds.read_sql_query(
sql="SELECT * FROM test.test_table",
con=con_rds,
)
31 - OpenSearch¶
Table of Contents¶
1. Initialize¶
[ ]:
# Install the optional modules first
!pip install 'awswrangler[opensearch]'
[1]:
import awswrangler as wr
Connect to your Amazon OpenSearch domain¶
[2]:
client = wr.opensearch.connect(
host='OPENSEARCH-ENDPOINT',
# username='FGAC-USERNAME(OPTIONAL)',
# password='FGAC-PASSWORD(OPTIONAL)'
)
client.info()
Enter your bucket name¶
[3]:
bucket = 'BUCKET'
Initialize sample data¶
[4]:
sf_restaurants_inspections = [
{
"inspection_id": "24936_20160609",
"business_address": "315 California St",
"business_city": "San Francisco",
"business_id": "24936",
"business_location": {"lon": -122.400152, "lat": 37.793199},
"business_name": "San Francisco Soup Company",
"business_postal_code": "94104",
"business_state": "CA",
"inspection_date": "2016-06-09T00:00:00.000",
"inspection_score": 77,
"inspection_type": "Routine - Unscheduled",
"risk_category": "Low Risk",
"violation_description": "Improper food labeling or menu misrepresentation",
"violation_id": "24936_20160609_103141",
},
{
"inspection_id": "60354_20161123",
"business_address": "10 Mason St",
"business_city": "San Francisco",
"business_id": "60354",
"business_location": {"lon": -122.409061, "lat": 37.783527},
"business_name": "Soup Unlimited",
"business_postal_code": "94102",
"business_state": "CA",
"inspection_date": "2016-11-23T00:00:00.000",
"inspection_type": "Routine",
"inspection_score": 95,
},
{
"inspection_id": "1797_20160705",
"business_address": "2872 24th St",
"business_city": "San Francisco",
"business_id": "1797",
"business_location": {"lon": -122.409752, "lat": 37.752807},
"business_name": "TIO CHILOS GRILL",
"business_postal_code": "94110",
"business_state": "CA",
"inspection_date": "2016-07-05T00:00:00.000",
"inspection_score": 90,
"inspection_type": "Routine - Unscheduled",
"risk_category": "Low Risk",
"violation_description": "Unclean nonfood contact surfaces",
"violation_id": "1797_20160705_103142",
},
{
"inspection_id": "66198_20160527",
"business_address": "1661 Tennessee St Suite 3B",
"business_city": "San Francisco Whard Restaurant",
"business_id": "66198",
"business_location": {"lon": -122.388478, "lat": 37.75072},
"business_name": "San Francisco Restaurant",
"business_postal_code": "94107",
"business_state": "CA",
"inspection_date": "2016-05-27T00:00:00.000",
"inspection_type": "Routine",
"inspection_score": 56,
},
{
"inspection_id": "5794_20160907",
"business_address": "2162 24th Ave",
"business_city": "San Francisco",
"business_id": "5794",
"business_location": {"lon": -122.481299, "lat": 37.747228},
"business_name": "Soup House",
"business_phone_number": "+14155752700",
"business_postal_code": "94116",
"business_state": "CA",
"inspection_date": "2016-09-07T00:00:00.000",
"inspection_score": 96,
"inspection_type": "Routine - Unscheduled",
"risk_category": "Low Risk",
"violation_description": "Unapproved or unmaintained equipment or utensils",
"violation_id": "5794_20160907_103144",
},
# duplicate record
{
"inspection_id": "5794_20160907",
"business_address": "2162 24th Ave",
"business_city": "San Francisco",
"business_id": "5794",
"business_location": {"lon": -122.481299, "lat": 37.747228},
"business_name": "Soup-or-Salad",
"business_phone_number": "+14155752700",
"business_postal_code": "94116",
"business_state": "CA",
"inspection_date": "2016-09-07T00:00:00.000",
"inspection_score": 96,
"inspection_type": "Routine - Unscheduled",
"risk_category": "Low Risk",
"violation_description": "Unapproved or unmaintained equipment or utensils",
"violation_id": "5794_20160907_103144",
},
]
2. Indexing (load)¶
Index documents (no Pandas)¶
[5]:
# index documents w/o providing keys (_id is auto-generated)
wr.opensearch.index_documents(
client,
documents=sf_restaurants_inspections,
index="sf_restaurants_inspections"
)
Indexing: 100% (6/6)|####################################|Elapsed Time: 0:00:01
[5]:
{'success': 6, 'errors': []}
[6]:
# read all documents. There are total 6 documents
wr.opensearch.search(
client,
index="sf_restaurants_inspections",
_source=["inspection_id", "business_name", "business_location"]
)
[6]:
_id | business_name | inspection_id | business_location.lon | business_location.lat | |
---|---|---|---|---|---|
0 | 663dd72d-0da4-495b-b0ae-ed000105ae73 | TIO CHILOS GRILL | 1797_20160705 | -122.409752 | 37.752807 |
1 | ff2f50f6-5415-4706-9bcb-af7c5eb0afa3 | Soup House | 5794_20160907 | -122.481299 | 37.747228 |
2 | b9e8f6a2-8fd1-4660-b041-2997a1a80984 | San Francisco Soup Company | 24936_20160609 | -122.400152 | 37.793199 |
3 | 56b352e6-102b-4eff-8296-7e1fb2459bab | Soup Unlimited | 60354_20161123 | -122.409061 | 37.783527 |
4 | 6fec5411-f79a-48e4-be7b-e0e44d5ebbab | San Francisco Restaurant | 66198_20160527 | -122.388478 | 37.750720 |
5 | 7ba4fb17-f9a9-49da-b90e-8b3553d6d97c | Soup-or-Salad | 5794_20160907 | -122.481299 | 37.747228 |
Index json file¶
[ ]:
import pandas as pd
df = pd.DataFrame(sf_restaurants_inspections)
path = f"s3://{bucket}/json/sf_restaurants_inspections.json"
wr.s3.to_json(df, path,orient='records',lines=True)
[8]:
# index json w/ providing keys
wr.opensearch.index_json(
client,
path=path, # path can be s3 or local
index="sf_restaurants_inspections_dedup",
id_keys=["inspection_id"] # can be multiple fields. arg applicable to all index_* functions
)
Indexing: 100% (6/6)|####################################|Elapsed Time: 0:00:00
[8]:
{'success': 6, 'errors': []}
[9]:
# now there are no duplicates. There are total 5 documents
wr.opensearch.search(
client,
index="sf_restaurants_inspections_dedup",
_source=["inspection_id", "business_name", "business_location"]
)
[9]:
_id | business_name | inspection_id | business_location.lon | business_location.lat | |
---|---|---|---|---|---|
0 | 24936_20160609 | San Francisco Soup Company | 24936_20160609 | -122.400152 | 37.793199 |
1 | 66198_20160527 | San Francisco Restaurant | 66198_20160527 | -122.388478 | 37.750720 |
2 | 5794_20160907 | Soup-or-Salad | 5794_20160907 | -122.481299 | 37.747228 |
3 | 60354_20161123 | Soup Unlimited | 60354_20161123 | -122.409061 | 37.783527 |
4 | 1797_20160705 | TIO CHILOS GRILL | 1797_20160705 | -122.409752 | 37.752807 |
Index CSV¶
[11]:
wr.opensearch.index_csv(
client,
index="nyc_restaurants_inspections_sample",
path='https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD', # index_csv supports local, s3 and url path
id_keys=["CAMIS"],
pandas_kwargs={'na_filter': True, 'nrows': 1000}, # pandas.read_csv() args - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
bulk_size=500 # modify based on your cluster size
)
Indexing: 100% (1000/1000)|##############################|Elapsed Time: 0:00:00
[11]:
{'success': 1000, 'errors': []}
[12]:
wr.opensearch.search(
client,
index="nyc_restaurants_inspections_sample",
size=5
)
[12]:
_id | CAMIS | DBA | BORO | BUILDING | STREET | ZIPCODE | PHONE | CUISINE DESCRIPTION | INSPECTION DATE | ... | RECORD DATE | INSPECTION TYPE | Latitude | Longitude | Community Board | Council District | Census Tract | BIN | BBL | NTA | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 41610426 | 41610426 | GLOW THAI RESTAURANT | Brooklyn | 7107 | 3 AVENUE | 11209.0 | 7187481920 | Thai | 02/26/2020 | ... | 10/04/2021 | Cycle Inspection / Re-inspection | 40.633865 | -74.026798 | 310.0 | 43.0 | 6800.0 | 3146519.0 | 3.058910e+09 | BK31 |
1 | 40811162 | 40811162 | CARMINE'S | Manhattan | 2450 | BROADWAY | 10024.0 | 2123622200 | Italian | 05/28/2019 | ... | 10/04/2021 | Cycle Inspection / Initial Inspection | 40.791168 | -73.974308 | 107.0 | 6.0 | 17900.0 | 1033560.0 | 1.012380e+09 | MN12 |
2 | 50012113 | 50012113 | TANG | Queens | 196-50 | NORTHERN BOULEVARD | 11358.0 | 7182797080 | Korean | 08/16/2018 | ... | 10/04/2021 | Cycle Inspection / Initial Inspection | 40.757850 | -73.784593 | 411.0 | 19.0 | 145101.0 | 4124565.0 | 4.055200e+09 | QN48 |
3 | 50014618 | 50014618 | TOTTO RAMEN | Manhattan | 248 | EAST 52 STREET | 10022.0 | 2124210052 | Japanese | 08/20/2018 | ... | 10/04/2021 | Cycle Inspection / Re-inspection | 40.756596 | -73.968749 | 106.0 | 4.0 | 9800.0 | 1038490.0 | 1.013250e+09 | MN19 |
4 | 50045782 | 50045782 | OLLIE'S CHINESE RESTAURANT | Manhattan | 2705 | BROADWAY | 10025.0 | 2129323300 | Chinese | 10/21/2019 | ... | 10/04/2021 | Cycle Inspection / Re-inspection | 40.799318 | -73.968440 | 107.0 | 6.0 | 19100.0 | 1056562.0 | 1.018750e+09 | MN12 |
5 rows × 27 columns
3. Search¶
Search results are returned as Pandas DataFrame
3.1 Search by DSL¶
[13]:
# add a search query. search all soup businesses
wr.opensearch.search(
client,
index="sf_restaurants_inspections",
_source=["inspection_id", "business_name", "business_location"],
filter_path=["hits.hits._id","hits.hits._source"],
search_body={
"query": {
"match": {
"business_name": "soup"
}
}
}
)
[13]:
_id | business_name | inspection_id | business_location.lon | business_location.lat | |
---|---|---|---|---|---|
0 | ff2f50f6-5415-4706-9bcb-af7c5eb0afa3 | Soup House | 5794_20160907 | -122.481299 | 37.747228 |
1 | 7ba4fb17-f9a9-49da-b90e-8b3553d6d97c | Soup-or-Salad | 5794_20160907 | -122.481299 | 37.747228 |
2 | b9e8f6a2-8fd1-4660-b041-2997a1a80984 | San Francisco Soup Company | 24936_20160609 | -122.400152 | 37.793199 |
3 | 56b352e6-102b-4eff-8296-7e1fb2459bab | Soup Unlimited | 60354_20161123 | -122.409061 | 37.783527 |
3.1 Search by SQL¶
[14]:
wr.opensearch.search_by_sql(
client,
sql_query="""SELECT business_name, inspection_score
FROM sf_restaurants_inspections_dedup
WHERE business_name LIKE '%soup%'
ORDER BY inspection_score DESC LIMIT 5"""
)
[14]:
_index | _type | _id | _score | business_name | inspection_score | |
---|---|---|---|---|---|---|
0 | sf_restaurants_inspections_dedup | _doc | 5794_20160907 | None | Soup-or-Salad | 96 |
1 | sf_restaurants_inspections_dedup | _doc | 60354_20161123 | None | Soup Unlimited | 95 |
2 | sf_restaurants_inspections_dedup | _doc | 24936_20160609 | None | San Francisco Soup Company | 77 |
4. Delete Indices¶
[15]:
wr.opensearch.delete_index(
client=client,
index="sf_restaurants_inspections"
)
[15]:
{'acknowledged': True}
5. Bonus - Prepare data and index from DataFrame¶
For this exercise we’ll use DOHMH New York City Restaurant Inspection Results dataset
[16]:
import pandas as pd
[17]:
df = pd.read_csv('https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD')
Prepare the data for indexing¶
[18]:
# fields names underscore casing
df.columns = [col.lower().replace(' ', '_') for col in df.columns]
# convert lon/lat to OpenSearch geo_point
df['business_location'] = "POINT (" + df.longitude.fillna('0').astype(str) + " " + df.latitude.fillna('0').astype(str) + ")"
Create index with mapping¶
[19]:
# delete index if exists
wr.opensearch.delete_index(
client=client,
index="nyc_restaurants"
)
# use dynamic_template to map date fields
# define business_location as geo_point
wr.opensearch.create_index(
client=client,
index="nyc_restaurants_inspections",
mappings={
"dynamic_templates" : [
{
"dates" : {
"match" : "*date",
"mapping" : {
"type" : "date",
"format" : 'MM/dd/yyyy'
}
}
}
],
"properties": {
"business_location": {
"type": "geo_point"
}
}
}
)
[19]:
{'acknowledged': True,
'shards_acknowledged': True,
'index': 'nyc_restaurants_inspections'}
Index dataframe¶
[20]:
wr.opensearch.index_df(
client,
df=df,
index="nyc_restaurants_inspections",
id_keys=["camis"],
bulk_size=1000
)
Indexing: 100% (382655/382655)|##########################|Elapsed Time: 0:04:15
[20]:
{'success': 382655, 'errors': []}
Execute geo query¶
Sort restaurants by distance from Times-Square¶
[21]:
wr.opensearch.search(
client,
index="nyc_restaurants_inspections",
filter_path=["hits.hits._source"],
size=100,
search_body={
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"business_location": { # Times-Square - https://geojson.io/#map=16/40.7563/-73.9862
"lat": 40.75613228383523,
"lon": -73.9865791797638
},
"order": "asc"
}
}
]
}
)
[21]:
camis | dba | boro | building | street | zipcode | phone | cuisine_description | inspection_date | action | ... | inspection_type | latitude | longitude | community_board | council_district | census_tract | bin | bbl | nta | business_location | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 41551304 | THE COUNTER | Manhattan | 7 | TIMES SQUARE | 10036.0 | 2129976801 | American | 12/22/2016 | Violations were cited in the following area(s). | ... | Cycle Inspection / Initial Inspection | 40.755908 | -73.986681 | 105.0 | 3.0 | 11300.0 | 1086069.0 | 1.009940e+09 | MN17 | POINT (-73.986680953809 40.755907817312) |
1 | 50055665 | ANN INC CAFE | Manhattan | 7 | TIMES SQUARE | 10036.0 | 2125413287 | American | 12/11/2019 | Violations were cited in the following area(s). | ... | Cycle Inspection / Initial Inspection | 40.755908 | -73.986681 | 105.0 | 3.0 | 11300.0 | 1086069.0 | 1.009940e+09 | MN17 | POINT (-73.986680953809 40.755907817312) |
2 | 50049552 | ERNST AND YOUNG | Manhattan | 5 | TIMES SQ | 10036.0 | 2127739994 | Coffee/Tea | 11/30/2018 | Violations were cited in the following area(s). | ... | Cycle Inspection / Initial Inspection | 40.755702 | -73.987208 | 105.0 | 3.0 | 11300.0 | 1024656.0 | 1.010130e+09 | MN17 | POINT (-73.987207980138 40.755702020307) |
3 | 50014078 | RED LOBSTER | Manhattan | 5 | TIMES SQ | 10036.0 | 2127306706 | Seafood | 10/03/2017 | Violations were cited in the following area(s). | ... | Cycle Inspection / Initial Inspection | 40.755702 | -73.987208 | 105.0 | 3.0 | 11300.0 | 1024656.0 | 1.010130e+09 | MN17 | POINT (-73.987207980138 40.755702020307) |
4 | 50015171 | NEW AMSTERDAM THEATER | Manhattan | 214 | WEST 42 STREET | 10036.0 | 2125825472 | American | 06/26/2018 | Violations were cited in the following area(s). | ... | Cycle Inspection / Re-inspection | 40.756317 | -73.987652 | 105.0 | 3.0 | 11300.0 | 1024660.0 | 1.010130e+09 | MN17 | POINT (-73.987651832547 40.756316895053) |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
95 | 41552060 | PROSKAUER ROSE | Manhattan | 11 | TIMES SQUARE | 10036.0 | 2129695493 | American | 08/11/2017 | Violations were cited in the following area(s). | ... | Administrative Miscellaneous / Initial Inspection | 40.756891 | -73.990023 | 105.0 | 3.0 | 11300.0 | 1087978.0 | 1.010138e+09 | MN17 | POINT (-73.990023200823 40.756890780426) |
96 | 41242148 | GABBY O'HARA'S | Manhattan | 123 | WEST 39 STREET | 10018.0 | 2122788984 | Irish | 07/30/2019 | Violations were cited in the following area(s). | ... | Cycle Inspection / Re-inspection | 40.753405 | -73.986602 | 105.0 | 4.0 | 11300.0 | 1080611.0 | 1.008150e+09 | MN17 | POINT (-73.986602050292 40.753404587174) |
97 | 50095860 | THE TIMES EATERY | Manhattan | 680 | 8 AVENUE | 10036.0 | 6463867787 | American | 02/28/2020 | Violations were cited in the following area(s). | ... | Pre-permit (Operational) / Initial Inspection | 40.757991 | -73.989218 | 105.0 | 3.0 | 11900.0 | 1024703.0 | 1.010150e+09 | MN17 | POINT (-73.989218092096 40.757991356019) |
98 | 50072861 | ITSU | Manhattan | 530 | 7 AVENUE | 10018.0 | 9176393645 | Asian/Asian Fusion | 09/10/2018 | Violations were cited in the following area(s). | ... | Pre-permit (Operational) / Initial Inspection | 40.753844 | -73.988551 | 105.0 | 3.0 | 11300.0 | 1014485.0 | 1.007880e+09 | MN17 | POINT (-73.988551029682 40.753843959794) |
99 | 50068109 | LUKE'S LOBSTER | Manhattan | 1407 | BROADWAY | 10018.0 | 9174759192 | Seafood | 09/06/2017 | Violations were cited in the following area(s). | ... | Pre-permit (Operational) / Initial Inspection | 40.753432 | -73.987151 | 105.0 | 3.0 | 11300.0 | 1015265.0 | 1.008140e+09 | MN17 | POINT (-73.98715066791 40.753432097521) |
100 rows × 27 columns
32 - AWS Lake Formation - Glue Governed tables¶
This tutorial assumes that your IAM user/role has the required Lake Formation permissions to create and read AWS Glue Governed tables¶
Table of Contents¶
1. Read Governed table¶
1.1 Read PartiQL query¶
[ ]:
import awswrangler as wr
database = "gov_db" # Assumes a Glue database registered with Lake Formation exists in the account
table = "gov_table" # Assumes a Governed table exists in the account
catalog_id = "111111111111" # AWS Account Id
# Note 1: If a transaction_id is not specified, a new transaction is started
df = wr.lakeformation.read_sql_query(
sql=f"SELECT * FROM {table};",
database=database,
catalog_id=catalog_id
)
1.1.1 Read within transaction¶
[ ]:
transaction_id = wr.lakeformation.start_transaction(read_only=True)
df = wr.lakeformation.read_sql_query(
sql=f"SELECT * FROM {table};",
database=database,
transaction_id=transaction_id
)
1.1.2 Read within query as of time¶
[ ]:
import calendar
import time
query_as_of_time = query_as_of_time = calendar.timegm(time.gmtime())
df = wr.lakeformation.read_sql_query(
sql=f"SELECT * FROM {table} WHERE id=:id; AND name=:name;",
database=database,
query_as_of_time=query_as_of_time,
params={"id": 1, "name": "Ayoub"}
)
1.2 Read full table¶
[ ]:
df = wr.lakeformation.read_sql_table(
table=table,
database=database
)
2. Write Governed table¶
2.1 Create a new Governed table¶
Enter your bucket name:¶
[ ]:
import getpass
bucket = getpass.getpass()
If a governed table does not exist, it can be created by passing an S3 path
argument. Make sure your IAM user/role has enough permissions in the Lake Formation database
2.1.1 CSV table¶
[ ]:
import pandas as pd
table = "gov_table_csv"
df=pd.DataFrame({
"col": [1, 2, 3],
"col2": ["A", "A", "B"],
"col3": [None, "test", None]
})
# Note 1: If a transaction_id is not specified, a new transaction is started
# Note 2: When creating a new Governed table, `table_type="GOVERNED"` must be specified. Otherwise the default is to create an EXTERNAL_TABLE
wr.s3.to_csv(
df=df,
path=f"s3://{bucket}/{database}/{table}/", # S3 path
dataset=True,
database=database,
table=table,
glue_table_settings={
"table_type": "GOVERNED",
},
)
2.1.2 Parquet table¶
[ ]:
table = "gov_table_parquet"
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/{database}/{table}/",
dataset=True,
database=database,
table=table,
glue_table_settings=wr.typing.GlueTableSettings(
table_type="GOVERNED",
description="c0",
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
columns_comments={"c0": "0"},
)
)
2.2 Overwrite operations¶
2.2.1 Overwrite¶
[ ]:
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
df=df,
dataset=True,
mode="overwrite",
database=database,
table=table,
glue_table_settings=wr.typing.GlueTableSettings(
description="c1",
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
columns_comments={"c1": "1"}
),
)
2.2.2 Append¶
[ ]:
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int8")
wr.s3.to_parquet(
df=df,
dataset=True,
mode="append",
database=database,
table=table,
description="c1",
parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)},
columns_comments={"c1": "1"}
)
2.2.3 Create partitioned Governed table¶
[ ]:
table = "gov_table_parquet_partitioned"
df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/{database}/{table}/",
dataset=True,
database=database,
table=table,
glue_table_settings=wr.typing.GlueTableSettings(
table_type="GOVERNED",
partition_cols=["c1"],
description="c0+c1",
parameters={"num_cols": "2", "num_rows": "2"},
columns_comments={"c0": "zero", "c1": "one"},
),
)
2.2.4 Overwrite partitions¶
[ ]:
df = pd.DataFrame({"c0": [None, None], "c1": [0, 2]})
wr.s3.to_parquet(
df=df,
dataset=True,
mode="overwrite_partitions",
database=database,
table=table,
partition_cols=["c1"],
description="c0+c1",
parameters={"num_cols": "2", "num_rows": "3"},
columns_comments={"c0": "zero", "c1": "one"}
)
3. Multiple read/write operations within a transaction¶
[ ]:
read_table = "gov_table_parquet"
write_table = "gov_table_multi_parquet"
transaction_id = wr.lakeformation.start_transaction(read_only=False)
df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
df=df,
path=f"s3://{bucket}/{database}/{write_table}_1",
dataset=True,
database=database,
table=f"{write_table}_1",
glue_table_settings={
"table_type": "GOVERNED",
"transaction_id": transaction_id,
},
)
df2 = wr.lakeformation.read_sql_table(
table=read_table,
database=database,
transaction_id=transaction_id,
use_threads=True
)
df3 = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
df=df2,
path=f"s3://{bucket}/{database}/{write_table}_2",
dataset=True,
mode="append",
database=database,
table=f"{write_table}_2",
glue_table_settings={
"table_type": "GOVERNED",
"transaction_id": transaction_id,
},
)
wr.lakeformation.commit_transaction(transaction_id=transaction_id)
33 - Amazon Neptune¶
Note: to be able to use SPARQL you must either install SPARQLWrapper
or install AWS SDK for pandas with sparql
extra:
[ ]:
!pip install 'awswrangler[gremlin, opencypher, sparql]'
Initialize¶
The first step to using AWS SDK for pandas with Amazon Neptune is to import the library and create a client connection.
Note: Connecting to Amazon Neptune requires that the application you are running has access to the Private VPC where Neptune is located. Without this access you will not be able to connect using AWS SDK for pandas.
[ ]:
import awswrangler as wr
import pandas as pd
url='<INSERT CLUSTER ENDPOINT>' # The Neptune Cluster endpoint
iam_enabled = False # Set to True/False based on the configuration of your cluster
neptune_port = 8182 # Set to the Neptune Cluster Port, Default is 8182
client = wr.neptune.connect(url, neptune_port, iam_enabled=iam_enabled)
Return the status of the cluster¶
[ ]:
print(client.status())
Retrieve Data from Neptune using AWS SDK for pandas¶
AWS SDK for pandas supports querying Amazon Neptune using TinkerPop Gremlin and openCypher for property graph data or SPARQL for RDF data.
Gremlin¶
[ ]:
query = "g.E().project('source', 'target').by(outV().id()).by(inV().id()).limit(5)"
df = wr.neptune.execute_gremlin(client, query)
display(df.head(5))
SPARQL¶
[ ]:
query = """
PREFIX foaf: <https://xmlns.com/foaf/0.1/>
PREFIX ex: <https://www.example.com/>
SELECT ?firstName WHERE { ex:JaneDoe foaf:knows ?person . ?person foaf:firstName ?firstName }"""
df = wr.neptune.execute_sparql(client, query)
display(df.head(5))
openCypher¶
[ ]:
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)
display(df.head(5))
Saving Data using AWS SDK for pandas¶
AWS SDK for pandas supports saving Pandas DataFrames into Amazon Neptune using either a property graph or RDF data model.
Property Graph¶
If writing to a property graph then DataFrames for vertices and edges must be written separately. DataFrames for vertices must have a ~label
column with the label and a ~id
column for the vertex id.
If the ~id
column does not exist, the specified id does not exists, or is empty then a new vertex will be added.
If no ~label
column exists then writing to the graph will be treated as an update of the element with the specified ~id
value.
DataFrames for edges must have a ~id
, ~label
, ~to
, and ~from
column. If the ~id
column does not exist the specified id does not exists, or is empty then a new edge will be added. If no ~label
, ~to
, or ~from
column exists an exception will be thrown.
Add Vertices/Nodes¶
[ ]:
import uuid
import random
import string
def _create_dummy_vertex():
data = dict()
data["~id"] = uuid.uuid4()
data["~label"] = "foo"
data["int"] = random.randint(0, 1000)
data["str"] = "".join(random.choice(string.ascii_lowercase) for i in range(10))
data["list"] = [random.randint(0, 1000), random.randint(0, 1000)]
return data
data = [_create_dummy_vertex(), _create_dummy_vertex(), _create_dummy_vertex()]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
query = f"MATCH (s) WHERE id(s)='{data[0]['~id']}' RETURN s"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Add Edges¶
[ ]:
import uuid
import random
import string
def _create_dummy_edge():
data = dict()
data["~id"] = uuid.uuid4()
data["~label"] = "bar"
data["~to"] = uuid.uuid4()
data["~from"] = uuid.uuid4()
data["int"] = random.randint(0, 1000)
data["str"] = "".join(random.choice(string.ascii_lowercase) for i in range(10))
return data
data = [_create_dummy_edge(), _create_dummy_edge(), _create_dummy_edge()]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
query = f"MATCH (s)-[r]->(d) WHERE id(r)='{data[0]['~id']}' RETURN r"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Update Existing Nodes¶
[ ]:
idval=uuid.uuid4()
wr.neptune.execute_gremlin(client, f"g.addV().property(T.id, '{str(idval)}')")
query = f"MATCH (s) WHERE id(s)='{idval}' RETURN s"
df = wr.neptune.execute_opencypher(client, query)
print("Before")
display(df)
data = [{"~id": idval, "age": 50}]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
df = wr.neptune.execute_opencypher(client, query)
print("After")
display(df)
Setting cardinality based on the header¶
If you would like to save data using single
cardinality then you can postfix (single) to the column header and set use_header_cardinality=True
(default). e.g. A column named name(single)
will save the name
property as single cardinality. You can disable this by setting use_header_cardinality=False
.
[ ]:
data = [_create_dummy_vertex()]
df = pd.DataFrame(data)
# Adding (single) to the column name in the DataFrame will cause it to write that property as `single` cardinality
df.rename(columns={"int": "int(single)"}, inplace=True)
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=True)
# This can be disabled by setting `use_header_cardinality = False`
df.rename(columns={"int": "int(single)"}, inplace=True)
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=False)
RDF¶
The DataFrame must consist of triples with column names for the subject, predicate, and object specified. If none are provided then s
, p
, and o
are the default.
If you want to add data into a named graph then you will also need the graph column, default is g
.
Write Triples¶
[ ]:
def _create_dummy_triple():
data = dict()
data["s"] = "http://example.com/resources/foo"
data["p"] = uuid.uuid4()
data["o"] = random.randint(0, 1000)
return data
data = [_create_dummy_triple(), _create_dummy_triple(), _create_dummy_triple()]
df = pd.DataFrame(data)
res = wr.neptune.to_rdf_graph(client, df)
query = """
PREFIX foo: <http://example.com/resources/>
SELECT ?o WHERE { <foo:foo> <" + str(data[0]['p']) + "> ?o .}"""
df = wr.neptune.execute_sparql(client, query)
display(df)
Write Quads¶
[ ]:
def _create_dummy_quad():
data = _create_dummy_triple()
data["g"] = "bar"
return data
data = [_create_dummy_quad(), _create_dummy_quad(), _create_dummy_quad()]
df = pd.DataFrame(data)
res = wr.neptune.to_rdf_graph(client, df)
query = """
PREFIX foo: <http://example.com/resources/>
SELECT ?o WHERE { <foo:foo> <" + str(data[0]['p']) + "> ?o .}"""
df = wr.neptune.execute_sparql(client, query)
display(df)
Flatten DataFrames¶
One of the complexities of working with a row/columns paradigm, such as Pandas, with graph results set is that it is very common for graph results to return complex and nested objects. To help simplify using the results returned from a graph within a more tabular format we have added a method to flatten the returned Pandas DataFrame.
Flattening the DataFrame¶
[ ]:
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n) RETURN n LIMIT 1"
df = wr.neptune.execute_opencypher(client, query)
print("Original")
display(df)
df_new=wr.neptune.flatten_nested_df(df)
print("Flattened")
display(df_new)
Removing the prefixing of the parent column name¶
[ ]:
df_new=wr.neptune.flatten_nested_df(df, include_prefix=False)
display(df_new)
Specifying the column header separator¶
[ ]:
df_new=wr.neptune.flatten_nested_df(df, separator='|')
display(df_new)
Putting it into a workflow¶
[ ]:
pip install igraph networkx
Running PageRank using NetworkX¶
[ ]:
import networkx as nx
# Retrieve Data from neptune
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100"
df = wr.neptune.execute_opencypher(client, query)
# Run PageRank
G=nx.from_pandas_edgelist(df, edge_attr=True)
pg = nx.pagerank(G)
# Save values back into Neptune
rows=[]
for k in pg.keys():
rows.append({'~id': k, 'pageRank_nx(single)': pg[k]})
pg_df=pd.DataFrame(rows, columns=['~id','pageRank_nx(single)'])
res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)
# Retrieve newly saved data
query = "MATCH (n:airport) WHERE n.pageRank_nx IS NOT NULL RETURN n.code, n.pageRank_nx ORDER BY n.pageRank_nx DESC LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Running PageRank using iGraph¶
[ ]:
import igraph as ig
# Retrieve Data from neptune
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100"
df = wr.neptune.execute_opencypher(client, query)
# Run PageRank
g = ig.Graph.TupleList(df.itertuples(index=False), directed=True, weights=False)
pg = g.pagerank()
# Save values back into Neptune
rows=[]
for idx, v in enumerate(g.vs):
rows.append({'~id': v['name'], 'pageRank_ig(single)': pg[idx]})
pg_df=pd.DataFrame(rows, columns=['~id','pageRank_ig(single)'])
res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)
# Retrieve newly saved data
query = "MATCH (n:airport) WHERE n.pageRank_ig IS NOT NULL RETURN n.code, n.pageRank_ig ORDER BY n.pageRank_ig DESC LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Bulk Load¶
Data can be written using the Neptune Bulk Loader by way of S3. The Bulk Loader is fast and optimized for large datasets.
For details on the IAM permissions needed to set this up, see here.
[ ]:
df = pd.DataFrame([_create_dummy_edge() for _ in range(1000)])
wr.neptune.bulk_load(
client=client,
df=df,
path="s3://my-bucket/stage-files/",
iam_role="arn:aws:iam::XXX:role/XXX",
)
Alternatively, if the data is already on S3 in CSV format, you can use the neptune.bulk_load_from_files
function. This is also useful if the data is written to S3 as a byproduct of an AWS Athena command, as the example below will show.
[ ]:
sql = """
SELECT
<col_id> AS "~id"
, <label_id> AS "~label"
, *
FROM <database>.<table>
"""
wr.athena.start_query_execution(
sql=sql,
s3_output="s3://my-bucket/stage-files-athena/",
wait=True,
)
wr.neptune.bulk_load_from_files(
client=client,
path="s3://my-bucket/stage-files-athena/",
iam_role="arn:aws:iam::XXX:role/XXX",
)
Both the bulk_load
and bulk_load_from_files
functions are suitable at scale. The latter simply invokes the Neptune Bulk Loader on existing data in S3. The former, however, involves writing CSV data to S3. With ray
and modin
installed, this operation can also be distributed across multiple workers in a Ray cluster.
34 - Distributing Calls Using Ray¶
AWS SDK for pandas supports distribution of specific calls using ray and modin.
When enabled, data loading methods return modin dataframes instead of pandas dataframes. Modin provides seamless integration and compatibility with existing pandas code, with the benefit of distributing operations across your Ray instance and operating at a much larger scale.
[1]:
!pip install "awswrangler[modin,ray,redshift]"
Importing awswrangler
when ray
and modin
are installed will automatically initialize a local Ray instance.
[2]:
import awswrangler as wr
import modin.pandas as pd
print(f"Execution Engine: {wr.engine.get()}")
print(f"Memory Format: {wr.memory_format.get()}")
2022-10-24 14:59:36,287 INFO worker.py:1518 -- Started a local Ray instance.
Execution Engine: EngineEnum.RAY
Memory Format: MemoryFormatEnum.MODIN
Read data at scale¶
Data is read using all cores on a single machine or multiple nodes on a cluster
[3]:
df = wr.s3.read_parquet(path="s3://amazon-reviews-pds/parquet/product_category=Furniture/")
df.head(5)
Read progress: 100%|██████████| 10/10 [01:10<00:00, 7.03s/it]
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[3]:
marketplace | customer_id | review_id | product_id | product_parent | product_title | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 35680291 | R34O1VWWYVAU9A | B000MWFEV6 | 406798096 | Baxton Studio Full Leather Storage Bench Ottom... | 5 | 1 | 1 | N | Y | High quality and roomy | I bought this bench as a storage necessity as ... | 2009-05-17 | 2009 |
1 | US | 21000590 | RU1I9NHALXPW5 | B004C1RULU | 239421036 | Alera Fraze Series Leather High-Back Swivel/Ti... | 3 | 8 | 9 | N | Y | Do not judge the chair on the first day alone. | Received this chair really fast because I had ... | 2012-06-29 | 2012 |
2 | US | 12140069 | R2O8R9CLCUQTB8 | B000GFWQDI | 297104356 | Matching Cherry Printer Stand with Casters and... | 5 | 4 | 4 | N | Y | Printer stand made into printer / PC stand | I wanted to get my pc's off the floor and off ... | 2009-05-17 | 2009 |
3 | US | 23755701 | R12FOIKUUXPHBZ | B0055DOI50 | 39731200 | Marquette Bed | 5 | 6 | 6 | N | Y | Excellent Value!! | Great quality for the price. This bed is easy ... | 2012-06-29 | 2012 |
4 | US | 50735969 | RK0XUO7P40TK9 | B0026RH3X2 | 751769063 | Cape Craftsman Shutter 2-Door Cabinet | 3 | 12 | 12 | N | N | Nice, but not best quality | I love the design of this cabinet! It's a very... | 2009-05-17 | 2009 |
The data type is a modin DataFrame
[4]:
type(df)
[4]:
modin.pandas.dataframe.DataFrame
However, this type is interoperable with standard pandas calls:
[5]:
filtered_df = df[df.helpful_votes > 10]
excluded_columns = ["product_title", "review_headline", "review_body"]
filtered_df = filtered_df.loc[:, ~filtered_df.columns.isin(excluded_columns)]
Enter your bucket name:
[6]:
bucket = "BUCKET_NAME"
Write data at scale¶
The write operation is parallelized, leading to significant speed-ups
[7]:
result = wr.s3.to_parquet(
filtered_df,
path=f"s3://{bucket}/amazon-reviews/",
dataset=True,
dtype={"review_date": "timestamp"},
)
print(f"Data has been written to {len(result['paths'])} files")
Write Progress: 100%|██████████| 10/10 [00:21<00:00, 2.14s/it]
Data has been written to 10 files
Copy to Redshift at scale…¶
Data is first staged in S3 then a COPY command is executed against the Redshift cluster to load it. Both operations are distributed: S3 write with Ray and COPY in the Redshift cluster
[8]:
# Connect to the Redshift instance
con = wr.redshift.connect("aws-sdk-pandas-redshift")
path = f"s3://{bucket}/stage/"
iam_role = "IAM_ROLE"
schema = "public"
table = "amazon_reviews"
wr.redshift.copy(
df=filtered_df,
path=path,
con=con,
schema=schema,
table=table,
mode="overwrite",
iam_role=iam_role,
max_rows_by_file=None,
)
Repartition: 100%|██████████| 1/1 [00:00<00:00, 1.42it/s]
Write Progress: 100%|██████████| 1/1 [00:06<00:00, 6.19s/it]
… and UNLOAD it back¶
Parallel calls can also be leveraged when reading from the cluster. The UNLOAD command distributes query processing in Redshift to dump files in S3 which are then read in parallel into a dataframe
[9]:
wr.redshift.unload(
sql=f"SELECT * FROM {schema}.{table} where star_rating = 5",
con=con,
iam_role=iam_role,
path=path,
keep_files=True,
)
2022-10-20 11:20:02,369 WARNING read_api.py:291 -- ⚠️ The number of blocks in this dataset (2) limits its parallelism to 2 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|██████████| 2/2 [00:01<00:00, 1.41it/s]
[9]:
marketplace | customer_id | review_id | product_id | product_parent | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 23875938 | RC5BC3HYUV324 | B000EPKLFA | 878266274 | 5 | 15 | 17 | N | Y | 2009-07-12 | 2009 |
1 | US | 22174246 | R3MFRIKP6HMH0W | B001NJ4J6I | 394928248 | 5 | 20 | 23 | N | Y | 2009-07-19 | 2009 |
2 | US | 52886745 | R1T9C0QELFI939 | B0012ZNNR4 | 364197484 | 5 | 32 | 33 | N | N | 2009-07-24 | 2009 |
3 | US | 14527742 | R2CIP31EO2GXDK | B000M5Z98G | 199037166 | 5 | 12 | 12 | N | Y | 2009-08-23 | 2009 |
4 | US | 41393002 | R29IOXB832QR6L | B0071HBVYE | 956030824 | 5 | 16 | 16 | N | Y | 2012-07-12 | 2012 |
... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
16022 | US | 20481704 | R2KV325KBKDKL8 | B00G701H5E | 703622282 | 5 | 16 | 16 | N | N | 2014-11-06 | 2014 |
16023 | US | 37023256 | R1FJT6UF7KM8GV | B005VY8U8Y | 220718418 | 5 | 23 | 25 | N | Y | 2014-11-08 | 2014 |
16024 | US | 24286944 | R1RSIZBY4Z3PF2 | B00LNCDGKU | 934098561 | 5 | 47 | 49 | N | Y | 2014-11-14 | 2014 |
16025 | US | 15276457 | R31YFDIUQ2HI2X | B005KFHWPG | 310427061 | 5 | 19 | 20 | N | Y | 2014-11-15 | 2014 |
16026 | US | 52215985 | R11U6K1OIDEUKH | B00NEJ4Y4M | 22567782 | 5 | 62 | 67 | Y | N | 2014-11-16 | 2014 |
16027 rows x 12 columns
Find a needle in a hay stack with S3 Select¶
[10]:
# Run S3 Select query against all objects in the category for a given customer ID
wr.s3.select_query(
sql="SELECT * FROM s3object s where s.\"customer_id\" = '51624146'",
path="s3://amazon-reviews-pds/parquet/product_category=Office_Products/*.parquet",
input_serialization="Parquet",
input_serialization_params={},
scan_range_chunk_size=32*1024*1024,
)
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[10]:
marketplace | customer_id | review_id | product_id | product_parent | product_title | star_rating | helpful_votes | total_votes | vine | verified_purchase | review_headline | review_body | review_date | year | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | US | 51624146 | RU9SWH8SHOBBS | B001ERDENS | 658861629 | LINKYO Compatible Toner Cartridge Replacement ... | 5 | 0 | 0 | N | Y | Perfect fit for my HP LaserJet M1522 nf | I will never buy "official" toner cart... | 2013-07-12 | 2013 |
1 | US | 51624146 | RAO9QADXC9TUH | B00GJQA4TG | 184072656 | SuperChalks White Liquid Chalk Marker Pens 4-P... | 4 | 0 | 0 | N | Y | Smooth flowing "ink, " but these markers left ... | Smooth flowing "ink," but these marker... | 2014-10-06 | 2014 |
2 | US | 51624146 | R1D94CA7TKY9DU | B000MK647G | 396184528 | Fax Toner Cartridge for Brother IntelliFax 575... | 5 | 0 | 0 | N | Y | Came quickly, works great | I bought four of these for my office. Just kno... | 2014-03-26 | 2014 |
35 - Distributing Calls on Ray Remote Cluster¶
AWS SDK for pandas supports distribution of specific calls on a cluster of EC2s using ray.
Note that this tutorial creates a cluster of EC2 nodes which will incur a charge in your account. Please make sure to delete the cluster at the end.
Install the library¶
[ ]:
!pip install "awswrangler[modin,ray]"
Configure and Build Ray Cluster on AWS¶
Build Prerequisite Infrastructure¶
Click on the link below to provision an AWS CloudFormation stack. It builds a security group and IAM instance profile for the Ray Cluster to use. A valid CIDR range (encompassing your local machine IP) and a VPC ID are required.
Configure Ray Cluster Configuration¶
Start with a cluster configuration file (YAML).
[ ]:
!touch config.yml
Replace all values to match your desired region, account number and name of resources deployed by the above CloudFormation Stack.
A limited set of AWS regions is currently supported (Python 3.8 and above). Find the corresponding Ray AMI IDs here. The example configuration below uses the AMI for us-east-1
.
Then edit config.yml
file with your custom configuration.
[ ]:
cluster_name: pandas-sdk-cluster
min_workers: 2
max_workers: 2
provider:
type: aws
region: us-east-1 # Change AWS region as necessary
availability_zone: us-east-1a,us-east-1b,us-east-1c # Change as necessary
security_group:
GroupName: ray-cluster
cache_stopped_nodes: False
available_node_types:
ray.head.default:
node_config:
InstanceType: m4.xlarge
IamInstanceProfile:
# Replace with your account id and profile name if you did not use the default value
Arn: arn:aws:iam::{ACCOUNT ID}:instance-profile/ray-cluster
# Replace ImageId if using a different region / python version
ImageId: ami-0ea510fcb67686b48
TagSpecifications: # Optional tags
- ResourceType: "instance"
Tags:
- Key: Platform
Value: "ray"
ray.worker.default:
min_workers: 2
max_workers: 2
node_config:
InstanceType: m4.xlarge
IamInstanceProfile:
# Replace with your account id and profile name if you did not use the default value
Arn: arn:aws:iam::{ACCOUNT ID}:instance-profile/ray-cluster
# Replace ImageId if using a different region / python version
ImageId: ami-0ea510fcb67686b48
TagSpecifications: # Optional tags
- ResourceType: "instance"
Tags:
- Key: Platform
Value: "ray"
setup_commands:
- pip install "awswrangler[modin,ray]==3.0.0"
Provision Ray Cluster¶
The command below creates a Ray cluster in your account based on the aforementioned config file. It consists of one head node and 2 workers (m4xlarge EC2s). The command takes a few minutes to complete.
[ ]:
!ray up -y config.yml
Once the cluster is up and running, we set the RAY_ADDRESS
environment variable to the head node Ray Cluster Address
[ ]:
import os, subprocess
head_node_ip = subprocess.check_output(['ray', 'get-head-ip', 'config.yml']).decode("utf-8").split("\n")[-2]
os.environ['RAY_ADDRESS'] = f"ray://{head_node_ip}:10001"
As a result, awswrangler
API calls now run on the cluster, not on your local machine. The SDK detects the required dependencies for its distributed mode and parallelizes supported methods on the cluster.
[ ]:
import awswrangler as wr
import modin.pandas as pd
print(f"Execution engine: {wr.engine.get()}")
print(f"Memory format: {wr.memory_format.get()}")
Enter bucket Name
[ ]:
bucket = "BUCKET_NAME"
Read & write some data at scale on the cluster¶
[ ]:
# Read last 3 months of Taxi parquet compressed data (400 Mb)
df = wr.s3.read_parquet(path="s3://ursa-labs-taxi-data/2018/1*.parquet")
df["month"] = df["pickup_at"].dt.month
# Write it back to S3 partitioned by month
path=f"s3://{bucket}/taxi-data/"
database = "ray_test"
wr.catalog.create_database(name=database, exist_ok=True)
table = "nyc_taxi"
wr.s3.to_parquet(
df=df,
path=path,
dataset=True,
database=database,
table=table,
partition_cols=["month"],
)
Read it back via Athena UNLOAD¶
The UNLOAD command distributes query processing in Athena to dump results in S3 which are then read in parallel into a dataframe
[ ]:
unload_path = f"s3://{bucket}/unload/nyc_taxi/"
# Athena UNLOAD requires that the S3 path is empty
# Note that s3.delete_objects is also a distributed call
wr.s3.delete_objects(unload_path)
wr.athena.read_sql_query(
f"SELECT * FROM {table}",
database=database,
ctas_approach=False,
unload_approach=True,
s3_output=unload_path,
)
The EC2 cluster must be terminated or it will incur a charge.
[ ]:
!ray down -y ./config.yml
36 - Distributing Calls on Glue Interactive sessions¶
AWS SDK for pandas is pre-loaded into AWS Glue interactive sessions with Ray kernel, making it by far the easiest way to experiment with the library at scale.
In AWS Glue Studio, choose Jupyter Notebook
to create an AWS Glue interactive session:

Then select Ray
as the kernel. The IAM role must trust the AWS Glue service principal.

Once the notebook is up and running you can import the library. Since we are running on AWS Glue with Ray, AWS SDK for pandas will automatically use the existing Ray cluster with no extra configuration needed.
Install the library¶
[ ]:
!pip install "awswrangler[modin]"
[1]:
import awswrangler as wr
Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.
Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::977422593089:role/AWSGlueMantaTests
Trying to create a Glue session for the kernel.
Worker Type: Z.2X
Number of Workers: 5
Session ID: 309824f0-bad7-49d0-a2b4-e1b8c7368c5f
Job Type: glueray
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session 309824f0-bad7-49d0-a2b4-e1b8c7368c5f to get into ready status...
Session 309824f0-bad7-49d0-a2b4-e1b8c7368c5f has been created.
2022-11-21 16:24:03,136 INFO worker.py:1329 -- Connecting to existing Ray cluster at address: 2600:1f10:4674:6822:5b63:3324:984:3152:6379...
2022-11-21 16:24:03,144 INFO worker.py:1511 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:8265
[3]:
df = wr.s3.read_csv(path="s3://nyc-tlc/csv_backup/yellow_tripdata_2021-0*.csv")
Read progress: 100%|##########| 9/9 [00:10<00:00, 1.15s/it]
UserWarning: When using a pre-initialized Ray cluster, please ensure that the runtime env sets environment variable __MODIN_AUTOIMPORT_PANDAS__ to 1
[4]:
df.head()
VendorID tpep_pickup_datetime ... total_amount congestion_surcharge
0 1.0 2021-01-01 00:30:10 ... 11.80 2.5
1 1.0 2021-01-01 00:51:20 ... 4.30 0.0
2 1.0 2021-01-01 00:43:30 ... 51.95 0.0
3 1.0 2021-01-01 00:15:48 ... 36.35 0.0
4 2.0 2021-01-01 00:31:49 ... 24.36 2.5
[5 rows x 18 columns]
To avoid incurring a charge, make sure to delete the Jupyter Notebook when you are done experimenting.
37 - Glue Data Quality¶
AWS Glue Data Quality helps you evaluate and monitor the quality of your data.
Create test data¶
First, let’s start by creating test data, writing it to S3, and registering it in the Glue Data Catalog.
[ ]:
import awswrangler as wr
import pandas as pd
glue_database = "aws_sdk_pandas"
glue_table = "my_glue_table"
path = "s3://BUCKET_NAME/my_glue_table/"
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [0, 1, 2], "c2": [0, 0, 0]})
wr.s3.to_parquet(df, path, dataset=True, database=glue_database, table=glue_table, partition_cols=["c2"])
Start with recommended data quality rules¶
AWS Glue Data Quality can recommend a set of data quality rules so you can get started quickly.
Note: Running Glue Data Quality recommendation and evaluation tasks requires an IAM role. This role must trust the Glue principal and allow permissions to various resources including the Glue table and the S3 bucket where your data is stored. Moreover, data quality IAM actions must be granted. To find out more, check Authorization.
[7]:
first_ruleset = "ruleset_1"
iam_role_arn = "arn:aws:iam::..." # IAM role assumed by the Glue Data Quality job to access resources
df_recommended_ruleset = wr.data_quality.create_recommendation_ruleset( # Creates a recommended ruleset
name=first_ruleset,
database=glue_database,
table=glue_table,
iam_role_arn=iam_role_arn,
number_of_workers=2,
)
df_recommended_ruleset
[7]:
rule_type | parameter | expression | |
---|---|---|---|
0 | RowCount | None | between 1 and 6 |
1 | IsComplete | "c0" | None |
2 | Uniqueness | "c0" | > 0.95 |
3 | ColumnValues | "c0" | <= 2 |
4 | IsComplete | "c1" | None |
5 | Uniqueness | "c1" | > 0.95 |
6 | ColumnValues | "c1" | <= 2 |
7 | IsComplete | "c2" | None |
8 | ColumnValues | "c2" | in ["0"] |
Update the recommended rules¶
Recommended rulesets are not perfect and you are likely to modify them or create your own.
[17]:
# Append and update rules
df_updated_ruleset = df_recommended_ruleset.append(
{"rule_type": "Uniqueness", "parameter": '"c2"', "expression": "> 0.95"}, ignore_index=True
)
df_updated_ruleset.at[8, "expression"] = "in [0, 1, 2]"
# Update the existing ruleset (upsert)
wr.data_quality.update_ruleset(
name=first_ruleset,
df_rules=df_updated_ruleset,
mode="upsert", # update existing or insert new rules to the ruleset
)
wr.data_quality.get_ruleset(name=first_ruleset)
[17]:
rule_type | parameter | expression | |
---|---|---|---|
0 | RowCount | None | between 1 and 6 |
1 | IsComplete | "c0" | None |
2 | Uniqueness | "c0" | > 0.95 |
3 | ColumnValues | "c0" | <= 2 |
4 | IsComplete | "c1" | None |
5 | Uniqueness | "c1" | > 0.95 |
6 | ColumnValues | "c1" | <= 2 |
7 | IsComplete | "c2" | None |
8 | ColumnValues | "c2" | in [0, 1, 2] |
9 | Uniqueness | "c2" | > 0.95 |
Run a data quality task¶
The ruleset can now be evaluated against the data. A cluster with 2 workers is used for the run. It returns a report with PASS
/FAIL
results for each rule.
[20]:
wr.data_quality.evaluate_ruleset(
name=first_ruleset,
iam_role_arn=iam_role_arn,
number_of_workers=2,
)
[20]:
Name | Description | Result | ResultId | EvaluationMessage | |
---|---|---|---|---|---|
0 | Rule_1 | RowCount between 1 and 6 | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
1 | Rule_2 | IsComplete "c0" | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
2 | Rule_3 | Uniqueness "c0" > 0.95 | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
3 | Rule_4 | ColumnValues "c0" <= 2 | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
4 | Rule_5 | IsComplete "c1" | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
5 | Rule_6 | Uniqueness "c1" > 0.95 | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
6 | Rule_7 | ColumnValues "c1" <= 2 | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
7 | Rule_8 | IsComplete "c2" | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
8 | Rule_9 | ColumnValues "c2" in [0,1,2] | PASS | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | NaN |
9 | Rule_10 | Uniqueness "c2" > 0.95 | FAIL | dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 | Value: 0.0 does not meet the constraint requir... |
Create ruleset from Data Quality Definition Language definition¶
The Data Quality Definition Language (DQDL) is a domain specific language that you can use to define Data Quality rules. For the full syntax reference, see DQDL.
[21]:
second_ruleset = "ruleset_2"
dqdl_rules = (
"Rules = ["
"RowCount between 1 and 6,"
'IsComplete "c0",'
'Uniqueness "c0" > 0.95,'
'ColumnValues "c0" <= 2,'
'IsComplete "c1",'
'Uniqueness "c1" > 0.95,'
'ColumnValues "c1" <= 2,'
'IsComplete "c2",'
'ColumnValues "c2" <= 1'
"]"
)
wr.data_quality.create_ruleset(
name=second_ruleset,
database=glue_database,
table=glue_table,
dqdl_rules=dqdl_rules,
)
Create or update a ruleset from a data frame¶
AWS SDK for pandas also enables you to create or update a ruleset from a pandas data frame.
[24]:
third_ruleset = "ruleset_3"
df_rules = pd.DataFrame({
"rule_type": ["RowCount", "ColumnCorrelation", "Uniqueness"],
"parameter": [None, '"c0" "c1"', '"c0"'],
"expression": ["between 2 and 8", "> 0.8", "> 0.95"],
})
wr.data_quality.create_ruleset(
name=third_ruleset,
df_rules=df_rules,
database=glue_database,
table=glue_table,
)
wr.data_quality.get_ruleset(name=third_ruleset)
[24]:
rule_type | parameter | expression | |
---|---|---|---|
0 | RowCount | None | between 2 and 8 |
1 | ColumnCorrelation | "c0" "c1" | > 0.8 |
2 | Uniqueness | "c0" | > 0.95 |
Get multiple rulesets¶
[25]:
wr.data_quality.get_ruleset(name=[first_ruleset, second_ruleset, third_ruleset])
[25]:
rule_type | parameter | expression | ruleset | |
---|---|---|---|---|
0 | RowCount | None | between 1 and 6 | ruleset_1 |
1 | IsComplete | "c0" | None | ruleset_1 |
2 | Uniqueness | "c0" | > 0.95 | ruleset_1 |
3 | ColumnValues | "c0" | <= 2 | ruleset_1 |
4 | IsComplete | "c1" | None | ruleset_1 |
5 | Uniqueness | "c1" | > 0.95 | ruleset_1 |
6 | ColumnValues | "c1" | <= 2 | ruleset_1 |
7 | IsComplete | "c2" | None | ruleset_1 |
8 | ColumnValues | "c2" | in [0, 1, 2] | ruleset_1 |
9 | Uniqueness | "c2" | > 0.95 | ruleset_1 |
0 | RowCount | None | between 1 and 6 | ruleset_2 |
1 | IsComplete | "c0" | None | ruleset_2 |
2 | Uniqueness | "c0" | > 0.95 | ruleset_2 |
3 | ColumnValues | "c0" | <= 2 | ruleset_2 |
4 | IsComplete | "c1" | None | ruleset_2 |
5 | Uniqueness | "c1" | > 0.95 | ruleset_2 |
6 | ColumnValues | "c1" | <= 2 | ruleset_2 |
7 | IsComplete | "c2" | None | ruleset_2 |
8 | ColumnValues | "c2" | <= 1 | ruleset_2 |
0 | RowCount | None | between 2 and 8 | ruleset_3 |
1 | ColumnCorrelation | "c0" "c1" | > 0.8 | ruleset_3 |
2 | Uniqueness | "c0" | > 0.95 | ruleset_3 |
Evaluate Data Quality for a given partition¶
A data quality evaluation run can be limited to specific partition(s) by leveraging the pushDownPredicate
expression in the additional_options
argument
[26]:
df = pd.DataFrame({"c0": [2, 0, 1], "c1": [1, 0, 2], "c2": [1, 1, 1]})
wr.s3.to_parquet(df, path, dataset=True, database=glue_database, table=glue_table, partition_cols=["c2"])
wr.data_quality.evaluate_ruleset(
name=third_ruleset,
iam_role_arn=iam_role_arn,
number_of_workers=2,
additional_options={
"pushDownPredicate": "(c2 == '1')",
},
)
[26]:
Name | Description | Result | ResultId | EvaluationMessage | |
---|---|---|---|---|---|
0 | Rule_1 | RowCount between 2 and 8 | PASS | dqresult-f676cfe0345aa93f492e3e3c3d6cf1ad99b84dc6 | NaN |
1 | Rule_2 | ColumnCorrelation "c0" "c1" > 0.8 | FAIL | dqresult-f676cfe0345aa93f492e3e3c3d6cf1ad99b84dc6 | Value: 0.5 does not meet the constraint requir... |
2 | Rule_3 | Uniqueness "c0" > 0.95 | PASS | dqresult-f676cfe0345aa93f492e3e3c3d6cf1ad99b84dc6 | NaN |
38 - OpenSearch Serverless¶
Amazon OpenSearch Serverless is an on-demand serverless configuration for Amazon OpenSearch Service.
Create collection¶
A collection in Amazon OpenSearch Serverless is a logical grouping of one or more indexes that represent an analytics workload.
Collections must have an assigned encryption policy, network policy, and a matching data access policy that grants permission to its resources.
[ ]:
# Install the optional modules first
!pip install 'awswrangler[opensearch]'
[1]:
import awswrangler as wr
[8]:
data_access_policy = [
{
"Rules": [
{
"ResourceType": "index",
"Resource": [
"index/my-collection/*",
],
"Permission": [
"aoss:*",
],
},
{
"ResourceType": "collection",
"Resource": [
"collection/my-collection",
],
"Permission": [
"aoss:*",
],
},
],
"Principal": [
wr.sts.get_current_identity_arn(),
],
}
]
AWS SDK for pandas can create default network and encryption policies based on the user input.
By default, the network policy allows public access to the collection, and the encryption policy encrypts the collection using AWS-managed KMS key.
Create a collection, and a corresponding data, network, and access policies:
[10]:
collection = wr.opensearch.create_collection(
name="my-collection",
data_policy=data_access_policy,
)
collection_endpoint = collection["collectionEndpoint"]
The call will wait and exit when the collection and corresponding policies are created and active.
To create a collection encrypted with customer KMS key, and attached to a VPC, provide KMS Key ARN and / or VPC endpoints:
[ ]:
kms_key_arn = "arn:aws:kms:..."
vpc_endpoint = "vpce-..."
collection = wr.opensearch.create_collection(
name="my-secure-collection",
data_policy=data_access_policy,
kms_key_arn=kms_key_arn,
vpc_endpoints=[vpc_endpoint],
)
Connect¶
Connect to the collection endpoint:
[12]:
client = wr.opensearch.connect(host=collection_endpoint)
Create index¶
To create an index, run:
[13]:
index="my-index-1"
wr.opensearch.create_index(
client=client,
index=index,
)
[13]:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-index-1'}
Index documents¶
To index documents:
[25]:
wr.opensearch.index_documents(
client,
documents=[{"_id": "1", "name": "John"}, {"_id": "2", "name": "George"}, {"_id": "3", "name": "Julia"}],
index=index,
)
Indexing: 100% (3/3)|####################################|Elapsed Time: 0:00:12
[25]:
{'success': 3, 'errors': []}
It is also possible to index Pandas data frames:
[26]:
import pandas as pd
df = pd.DataFrame(
[{"_id": "1", "name": "John", "tags": ["foo", "bar"]}, {"_id": "2", "name": "George", "tags": ["foo"]}]
)
wr.opensearch.index_df(
client,
df=df,
index="index-df",
)
Indexing: 100% (2/2)|####################################|Elapsed Time: 0:00:12
[26]:
{'success': 2, 'errors': []}
AWS SDK for pandas also supports indexing JSON and CSV documents.
For more examples, refer to the 031 - OpenSearch tutorial
Search¶
Search using search DSL:
[27]:
wr.opensearch.search(
client,
index=index,
search_body={
"query": {
"match": {
"name": "Julia"
}
}
}
)
[27]:
_id | name | |
---|---|---|
0 | 3 | Julia |
Delete index¶
To delete an index, run:
[ ]:
wr.opensearch.delete_index(
client=client,
index=index
)
39 - Athena Iceberg¶
Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue catalog for their metastore. More in User Guide.
Create Iceberg table¶
[50]:
import getpass
bucket_name = getpass.getpass()
[2]:
import awswrangler as wr
glue_database = "aws_sdk_pandas"
glue_table = "iceberg_test"
path = f"s3://{bucket_name}/iceberg_test/"
temp_path = f"s3://{bucket_name}/iceberg_test_temp/"
# Cleanup table before create
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)
[2]:
True
Create table & insert data¶
It is possible to insert Pandas data frame into Iceberg table using wr.athena.to_iceberg
. If the table does not exist, it will be created:
[ ]:
import pandas as pd
df = pd.DataFrame({"id": [1, 2, 3], "name": ["John", "Lily", "Richard"]})
wr.athena.to_iceberg(
df=df,
database=glue_database,
table=glue_table,
table_location=path,
temp_path=temp_path,
)
Alternatively, it is also possible to insert by directly running INSERT INTO ... VALUES
:
[53]:
wr.athena.start_query_execution(
sql=f"INSERT INTO {glue_table} VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
database=glue_database,
wait=True,
)
[53]:
{'QueryExecutionId': 'e339fcd2-9db1-43ac-bb9e-9730e6395b51',
'Query': "INSERT INTO iceberg_test VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 8, 612000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 11, 143000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2242,
'DataScannedInBytes': 0,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51-manifest.csv',
'TotalExecutionTimeInMillis': 2531,
'QueryQueueTimeInMillis': 241,
'QueryPlanningTimeInMillis': 179,
'ServiceProcessingTimeInMillis': 48,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
[54]:
wr.athena.start_query_execution(
sql=f"INSERT INTO {glue_table} VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
database=glue_database,
wait=True,
)
[54]:
{'QueryExecutionId': '922c8f02-4c00-4050-b4a7-7016809efa2b',
'Query': "INSERT INTO iceberg_test VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 24, 582000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 27, 352000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2414,
'DataScannedInBytes': 0,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b-manifest.csv',
'TotalExecutionTimeInMillis': 2770,
'QueryQueueTimeInMillis': 329,
'QueryPlanningTimeInMillis': 189,
'ServiceProcessingTimeInMillis': 27,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Query¶
[65]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[65]:
id | name | |
---|---|---|
0 | 1 | John |
1 | 4 | Anne |
2 | 2 | Lily |
3 | 3 | Richard |
4 | 5 | Jacob |
5 | 6 | Leon |
Read query metadata¶
In a SELECT query, you can use the following properties after table_name
to query Iceberg table metadata:
$files
Shows a table’s current data files
$manifests
Shows a table’s current file manifests
$history
Shows a table’s history
$partitions
Shows a table’s current partitions
[55]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$files"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[55]:
content | file_path | file_format | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | s3://.../iceberg_test/data/089a... | PARQUET | 3 | 360 | {1=48, 2=63} | {1=3, 2=3} | {1=0, 2=0} | {} | {1=1, 2=John} | {1=3, 2=Richard} | <NA> | NaN | NaN |
1 | 0 | s3://.../iceberg_test/data/5736... | PARQUET | 3 | 355 | {1=48, 2=61} | {1=3, 2=3} | {1=0, 2=0} | {} | {1=4, 2=Anne} | {1=6, 2=Leon} | <NA> | NaN | NaN |
[56]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$manifests"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[56]:
path | length | partition_spec_id | added_snapshot_id | added_data_files_count | added_rows_count | existing_data_files_count | existing_rows_count | deleted_data_files_count | deleted_rows_count | partitions | |
---|---|---|---|---|---|---|---|---|---|---|---|
0 | s3://.../iceberg_test/metadata/... | 6538 | 0 | 4379263637983206651 | 1 | 3 | 0 | 0 | 0 | 0 | [] |
1 | s3://.../iceberg_test/metadata/... | 6548 | 0 | 2934717851675145063 | 1 | 3 | 0 | 0 | 0 | 0 | [] |
[58]:
df = wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$history"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
# Save snapshot id
snapshot_id = df.snapshot_id[0]
df
[58]:
made_current_at | snapshot_id | parent_id | is_current_ancestor | |
---|---|---|---|---|
0 | 2023-03-16 09:40:10.438000+00:00 | 2934717851675145063 | <NA> | True |
1 | 2023-03-16 09:40:26.754000+00:00 | 4379263637983206651 | 2934717851675144704 | True |
[59]:
wr.athena.read_sql_query(
sql=f'SELECT * FROM "{glue_table}$partitions"',
database=glue_database,
ctas_approach=False,
unload_approach=False,
)
[59]:
record_count | file_count | total_size | data | |
---|---|---|---|---|
0 | 6 | 2 | 715 | {id={min=1, max=6, null_count=0, nan_count=nul... |
Time travel¶
[60]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table} FOR TIMESTAMP AS OF (current_timestamp - interval '5' second)",
database=glue_database,
)
[60]:
id | name | |
---|---|---|
0 | 1 | John |
1 | 4 | Anne |
2 | 2 | Lily |
3 | 3 | Richard |
4 | 5 | Jacob |
5 | 6 | Leon |
Version travel¶
[61]:
wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table} FOR VERSION AS OF {snapshot_id}",
database=glue_database,
)
[61]:
id | name | |
---|---|---|
0 | 1 | John |
1 | 2 | Lily |
2 | 3 | Richard |
Optimize¶
The OPTIMIZE table REWRITE DATA
compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files. For syntax and table property details, see OPTIMIZE.
[62]:
wr.athena.start_query_execution(
sql=f"OPTIMIZE {glue_table} REWRITE DATA USING BIN_PACK",
database=glue_database,
wait=True,
)
[62]:
{'QueryExecutionId': '94666790-03ae-42d7-850a-fae99fa79a68',
'Query': 'OPTIMIZE iceberg_test REWRITE DATA USING BIN_PACK',
'StatementType': 'DDL',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 42, 857000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 45, 655000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2622,
'DataScannedInBytes': 220,
'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68-manifest.csv',
'TotalExecutionTimeInMillis': 2798,
'QueryQueueTimeInMillis': 124,
'QueryPlanningTimeInMillis': 252,
'ServiceProcessingTimeInMillis': 52,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Vacuum¶
VACUUM
performs snapshot expiration and orphan file removal. These actions reduce metadata size and remove files not in the current table state that are also older than the retention period specified for the table. For syntax details, see VACUUM.
[64]:
wr.athena.start_query_execution(
sql=f"VACUUM {glue_table}",
database=glue_database,
wait=True,
)
[64]:
{'QueryExecutionId': '717a7de6-b873-49c7-b744-1b0b402f24c9',
'Query': 'VACUUM iceberg_test',
'StatementType': 'DML',
'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/717a7de6-b873-49c7-b744-1b0b402f24c9.csv'},
'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
'Status': {'State': 'SUCCEEDED',
'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 41, 14000, tzinfo=tzlocal()),
'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 43, 441000, tzinfo=tzlocal())},
'Statistics': {'EngineExecutionTimeInMillis': 2229,
'DataScannedInBytes': 0,
'TotalExecutionTimeInMillis': 2427,
'QueryQueueTimeInMillis': 153,
'QueryPlanningTimeInMillis': 30,
'ServiceProcessingTimeInMillis': 45,
'ResultReuseInformation': {'ReusedPreviousResult': False}},
'WorkGroup': 'primary',
'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
'EffectiveEngineVersion': 'Athena engine version 3'}}
Architectural Decision Records¶
A collection of records for “architecturally significant” decisions: those that affect the structure, non-functional characteristics, dependencies, interfaces, or construction techniques.
These decisions are made by the team which maintains AWS SDK for pandas. However, suggestions can be submitted by any contributor via issues or pull requests.
Note
You can also find all ADRs on GitHub.
1. Record architecture decisions¶
Date: 2023-03-08
Status¶
Accepted
Context¶
We need to record the architectural decisions made on this project.
Decision¶
We will use Architecture Decision Records, as described by Michael Nygard.
Consequences¶
See Michael Nygard’s article, linked above. For a lightweight ADR toolset, see Nat Pryce’s adr-tools.
2. Handling unsupported arguments in distributed mode¶
Date: 2023-03-09
Status¶
Accepted
Context¶
Many of the API functions allow the user to pass their own boto3
session, which will then be used by all the underlying boto3
calls. With distributed computing, one of the limitations we have is that we cannot pass the boto3
session to the worker nodes.
Boto3 session are not thread-safe, and therefore cannot be passed to Ray workers. The credentials behind a boto3
session cannot be sent to Ray workers either, since sending credentials over the network is considered a security risk.
This raises the question of what to do when, in distributed mode, the customer passes arguments that are normally supported, but aren’t supported in distributed mode.
Decision¶
When a user passes arguments that are unsupported by distributed mode, the function should fail immediately.
The main alternative to this approach would be if a parameter such as a boto3
session is passed, we should use it where possible. This could result in a situation where, when reading Parquet files from S3, the process of listing the files uses the boto3
session whereas the reading of the Parquet files doesn’t. This could result in inconsistent behavior, as part of the function uses the extra parameters while the other part of it doesn’t.
Another alternative would simply be to ignore the unsupported parameters, while potentially outputting a warning. The main issue with this approach is that if a customer tells our API functions to use certain parameters, they expect those parameters to be used. By ignoring them, the the AWS SDK for pandas API would be doing something different from what the customer asked, without properly notifying them, and would thus lose the customer’s trust.
Consequences¶
In PR#2501, the validate_distributed_kwargs
annotation was introduced which can check for the presence of arguments that are unsupported in the distributed mode.
The annotation has also been applied for arguments such as s3_additional_kwargs
and version_id
when reading/writing data on S3.
3. Use TypedDict to group similar parameters¶
Date: 2023-03-10
Status¶
Accepted
Context¶
AWS SDK for pandas API methods contain many parameters which are related to a specific behaviour or setting. For example, methods which have an option to update the Glue AWScatalog, such as to_csv
and to_parquet
, contain a list of parameters that define the settings for the table in AWS Glue. These settings include the table description, column comments, the table type, etc.
As a consequence, some of our functions have grown to include dozens of parameters. When reading the function signatures, it can be unclear which parameters are related to which functionality. For example, it’s not immediately obvious that the parameter column_comments
in s3.to_parquet
only writes the column comments into the AWS Glue catalog, and not to S3.
Decision¶
Parameters that are related to similar functionality will be replaced by a single parameter of type TypedDict. This will allow us to reduce the amount of parameters for our API functions, and also make it clearer that certain parameters are only related to specific functionalities.
For example, parameters related to Athena cache settings will be extracted into a parameter of type AthenaCacheSettings
, parameters related to Ray settings will be extracted into RayReadParquetSettings
, etc.
The usage of TypedDict
allows the user to define the parameters as regular dictionaries with string keys, while empowering type checkers such as mypy
. Alternately, implementations such as AthenaCacheSettings
can be instantiated as classes.
Alternatives¶
The main alternative that was considered was the idea of using dataclass
instead of TypedDict
. The advantage of this alternative would be that default values for parameters could be defined directly in the class signature, rather than needing to be defined in the function which uses the parameter.
On the other hand, the main issue with using dataclass
is that it would require the customer figure out which class needs to be imported. With TypedDict
, this is just one of the options; the parameters can simply be passed as a typical Python dictionary.
This alternative was discussed in more detail as part of PR#1855.
Consequences¶
Subclasses of TypedDict
such as GlueCatalogParameters
, AthenaCacheSettings
, AthenaUNLOADSettings
, AthenaCTASSettings
and RaySettings
have been created. They are defined in the wrangler.typing
module.
These parameters grouping can used in either of the following two ways:
wr.athena.read_sql_query(
"SELECT * FROM ...",,
ctas_approach=True,
athena_cache_settings={"max_cache_seconds": 900},
)
wr.athena.read_sql_query(
"SELECT * FROM ...",,
ctas_approach=True,
athena_cache_settings=wr.typing.AthenaCacheSettings(
max_cache_seconds=900,
),
)
Many of our functions signatures have been changes to take advantage of this refactor. Many of these are breaking changes which will be released as part of the next major version: 3.0.0
.
4. AWS SDK for pandas does not alter IAM permissions¶
Date: 2023-03-15
Status¶
Accepted
Context¶
AWS SDK for pandas requires permissions to execute AWS API calls. Permissions are granted using AWS Identity and Access Management Policies that are attached to IAM entities - users or roles.
Decision¶
AWS SDK for pandas does not alter (create, update, delete) IAM permissions policies attached to the IAM entities.
Consequences¶
It is users responsibility to ensure IAM entities they are using to execute the calls have the required permissions.
5. Move dependencies to optional¶
Date: 2023-03-15
Status¶
Accepted
Context¶
AWS SDK for pandas relies on external dependencies in some of its modules. These include redshift-connector
, gremlinpython
and pymysql
to cite a few.
In versions 2.x and below, most of these packages were set as required, meaning they were installed regardless of whether the user actually needed them. This has introduced two major risks and issues as the number of dependencies increased:
Security risk: Unused dependencies increase the attack surface to manage. Users must scan them and ensure that they are kept up to date even though they don’t need them
Dependency hell: Users must resolve dependencies for packages that they are not using. It can lead to dependency hell and prevent critical updates related to security patches and major bugs
Decision¶
A breaking change is introduced in version 3.x where the number of required dependencies is reduced to the most important ones, namely:
boto3
pandas
numpy
pyarrow
typing-extensions
Consequences¶
All other dependencies are moved to optional and must be installed by the user separately using pip install awswrangler[dependency]
. For instance, the command to use the redshift APIs is pip install awswrangler[redshift]
. Failing to do so raises an exception informing the user that the package is missing and how to install it
6. Deprecate wr.s3.merge_upsert_table¶
Date: 2023-03-15
Status¶
Accepted
Context¶
AWS SDK for pandas wr.s3.merge_upsert_table
is used to perform upsert (update else insert) onto an existing AWS Glue
Data Catalog table. It is a much simplified version of upsert functionality that is supported natively by Apache Hudi
and Athena Iceberg tables, and does not, for example, handle partitioned datasets.
Decision¶
To avoid poor user experience wr.s3.merge_upsert_table
is deprecated and will be removed in 3.0 release.
Consequences¶
In PR#2076, wr.s3.merge_upsert_table
function was removed.
7. Design of engine and memory format¶
Date: 2023-03-16
Status¶
Accepted
Context¶
Ray and Modin are the two frameworks used to support running awswrangler
APIs at scale. Adding them to the codebase requires significant refactoring work. The original approach considered was to handle both distributed and non-distributed code within the same modules. This quickly turned out to be undesirable as it affected the readability, maintainability and scalability of the codebase.
Decision¶
Version 3.x of the library introduces two new constructs, engine
and memory_format
, which are designed to address the aforementioned shortcomings of the original approach, but also provide additional functionality.
Currently engine
takes one of two values: python
(default) or ray
, but additional engines could be onboarded in the future. The value is determined at import based on installed dependencies. The user can override this value with wr.engine.set("engine_name")
. Likewise, memory_format
can be set to pandas
(default) or modin
and overridden with wr.memory_format.set("memory_format_name")
.
A custom dispatcher is used to register functions based on the execution and memory format values. For instance, if the ray
engine is detected at import, then methods distributed with Ray are used instead of the default AWS SDK for pandas code.
Consequences¶
The good:
Clear separation of concerns: Distributed methods live outside non-distributed code, eliminating ugly if conditionals, allowing both to scale independently and making them easier to maintain in the future
Better dispatching: Adding a new engine/memory format is as simple as creating a new directory with its methods and registering them with the custom dispatcher based on the value of the engine or memory format
Custom engine/memory format classes: Give more flexibility than config when it comes to interacting with the engine and managing its state (initialising, registering, get/setting…)
The bad:
Managing state: Adding a custom dispatcher means that we must maintain its state. For instance, unregistering methods when a user sets a different engine (e.g. moving from ray to dask at execution time) is currently unsupported
Detecting the engine: Conditionals are simpler/easier when it comes to detecting an engine. With a custom dispatcher, the registration and dispatching process is more opaque/convoluted. For example, there is a higher risk of not realising that we are using a given engine vs another
The ugly:
Unused arguments: Each method registered with the dispatcher must accept the union of both non-distributed and distributed arguments, even though some would be unused. As the list of supported engines grows, so does the number of unused arguments. It also means that we must maintain the same list of arguments across the different versions of the method
8. Switching between PyArrow and Pandas based datasources for CSV/JSON I/O¶
Date: 2023-03-16
Status¶
Accepted
Context¶
The reading and writing operations for CSV/JSON data in AWS SDK for pandas make use of the underlying functions in Pandas. For example, wr.s3.read_csv
will open a stream of data from S3 and then invoke pandas.read_csv
. This allows the library to fully support all the arguments which are supported by the underlying Pandas functions. Functions such as wr.s3.read_csv
or wr.s3.to_json
accept a **kwargs
parameter which forwards all parameters to pandas.read_csv
and pandas.to_json
automatically.
From version 3.0.0 onward, AWS SDK for pandas supports Ray and Modin. When those two libraries are installed, all aforementioned I/O functions will be distributed on a Ray cluster. In the background, this means that all the I/O functions for S3 are running as part of a custom Ray data source. Data is then returned in blocks, which form the Modin DataFrame.
The issue is that the Pandas I/O functions work very slowly in the Ray datasource compared with the equivalent I/O functions in PyArrow. Therefore, calling pyarrow.csv.read_csv
is significantly faster than calling pandas.read_csv
in the background.
However, the PyArrow I/O functions do not support the same set of parameters as the ones in Pandas. As a consequence, whereas the PyArrow functions offer greater performance, they come at the cost of feature parity between the non-distributed mode and the distributed mode.
For reference, loading 5 GiB of CSV data with the PyArrow functions took around 30 seconds, compared to 120 seconds with the Pandas functions in the same scenario. For writing back to S3, the speed-up is around 2x.
Decision¶
In order to maximize both performance without losing feature parity, we implemented logic whereby if the user passes a set of parameters which are supported by PyArrow, the library uses PyArrow for reading/writing. If not, the library defaults to the slower Pandas functions, which will support the set of parameter.
The following example will illustrate the difference:
# This will be loaded by PyArrow, as `doublequote` is supported
wr.s3.read_csv(
path="s3://my-bucket/my-path/",
dataset=True,
doublequote=False,
)
# This will be loaded using the Pandas I/O functions, as `comment` is not supported by PyArrow
wr.s3.read_csv(
path="s3://my-bucket/my-path/",
dataset=True,
comment="#",
)
This logic is applied to the following functions:
wr.s3.read_csv
wr.s3.read_json
wr.s3.to_json
wr.s3.to_csv
Consequences¶
The logic of switching between using PyArrow or Pandas functions in background was implemented as part of #1699. It was later expanded to support more parameters in #2008 and #2019.
API Reference¶
Amazon S3¶
|
Copy a list of S3 objects to another S3 directory. |
|
Delete Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Check if object exists on S3. |
|
Download file from a received S3 path to local file. |
|
Get bucket region name. |
|
List Amazon S3 buckets. |
|
List Amazon S3 objects from a prefix. |
|
List Amazon S3 objects from a prefix. |
|
Merge a source dataset into a target dataset. |
|
Read CSV file(s) from a received S3 prefix or list of S3 objects paths. |
|
Read EXCEL file(s) from a received S3 path. |
|
Read fixed-width formatted file(s) from a received S3 prefix or list of S3 objects paths. |
|
Read JSON file(s) from a received S3 prefix or list of S3 objects paths. |
|
Read Parquet file(s) from an S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet file(s) metadata from an S3 prefix or list of S3 objects paths. |
|
Read Apache Parquet table registered in the AWS Glue Catalog. |
|
Load a Deltalake table data from an S3 path. |
|
Filter contents of Amazon S3 objects based on SQL statement. |
|
Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths. |
|
Infer and store parquet metadata on AWS Glue Catalog. |
|
Write CSV file or dataset on Amazon S3. |
|
Write EXCEL file on Amazon S3. |
|
Write JSON file on Amazon S3. |
|
Write Parquet file or dataset on Amazon S3. |
|
Write a DataFrame to S3 as a DeltaLake table. |
|
Upload file from a local file to received S3 path. |
|
Wait Amazon S3 objects exist. |
|
Wait Amazon S3 objects not exist. |
AWS Glue Catalog¶
|
Add a column in a AWS Glue Catalog table. |
|
Add partitions (metadata) to a CSV Table in the AWS Glue Catalog. |
|
Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog. |
|
Create a CSV Table (Metadata Only) in the AWS Glue Catalog. |
|
Create a database in AWS Glue Catalog. |
|
Create a JSON Table (Metadata Only) in the AWS Glue Catalog. |
|
Create a Parquet Table (Metadata Only) in the AWS Glue Catalog. |
|
Get a Pandas DataFrame with all listed databases. |
|
Delete a column in a AWS Glue Catalog table. |
|
Delete a database in AWS Glue Catalog. |
|
Delete specified partitions in a AWS Glue Catalog table. |
|
Delete all partitions in a AWS Glue Catalog table. |
|
Delete Glue table if exists. |
|
Check if the table exists. |
Drop all repeated columns (duplicated names). |
|
|
Extract columns and partitions types (Amazon Athena) from Pandas DataFrame. |
|
Get all columns comments. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get an iterator of databases. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get all partitions from a Table in the AWS Glue Catalog. |
|
Get table description. |
|
Get table's location on Glue catalog. |
|
Get total number of versions. |
|
Get all parameters. |
|
Get all columns and types from a table. |
|
Get all versions. |
|
Get an iterator of tables. |
|
Overwrite all existing parameters. |
|
Convert the column name to be compatible with Amazon Athena and the AWS Glue Catalog. |
|
Normalize all columns names to be compatible with Amazon Athena. |
|
Convert the table name to be compatible with Amazon Athena and the AWS Glue Catalog. |
|
Get Pandas DataFrame of tables filtered by a search string. |
|
Get table details as Pandas DataFrame. |
|
Get a DataFrame with tables filtered by a search term, prefix, suffix. |
|
Insert or Update the received parameters. |
Amazon Athena¶
|
Create the default Athena bucket if it doesn't exist. |
|
Create a new table populated with the results of a SELECT query. |
|
Generate the query that created a table(EXTERNAL_TABLE) or a view(VIRTUAL_TABLE). |
|
Get the data type of all columns queried. |
|
Fetch query execution details. |
|
From specified query execution IDs, return a DataFrame of query execution details. |
|
Get AWS Athena SQL query results as a Pandas DataFrame. |
|
Get the named query statement string from a query ID. |
|
Return information about the workgroup with the specified name. |
|
Fetch list query execution IDs ran in specified workgroup or primary work group if not specified. |
|
Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame. |
|
Extract the full table AWS Athena and return the results as a Pandas DataFrame. |
|
Run the Hive's metastore consistency check: 'MSCK REPAIR TABLE table;'. |
|
Start a SQL Query against AWS Athena. |
|
Stop a query execution. |
|
Insert into Athena Iceberg table using INSERT INTO . |
|
Write query results from a SELECT statement to the specified data format using UNLOAD. |
|
Wait for the query end. |
AWS Lake Formation¶
|
Execute PartiQL query on AWS Glue Table (Transaction ID or time travel timestamp). |
|
Extract all rows from AWS Glue Table (Transaction ID or time travel timestamp). |
|
Cancel the specified transaction. |
|
Commit the specified transaction. |
|
Return the status of a single transaction. |
|
Indicate to the service that the specified transaction is still active and should not be canceled. |
|
Start a new transaction and returns its transaction ID. |
|
Wait for the query to end. |
Amazon Redshift¶
|
Return a redshift_connector connection from a Glue Catalog or Secret Manager. |
|
Return a redshift_connector temporary connection (No password required). |
|
Load Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage. |
|
Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command). |
|
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
Write records stored in a DataFrame into Redshift. |
|
Load Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage. |
|
Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command). |
PostgreSQL¶
|
Return a pg8000 connection from a Glue Catalog Connection. |
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
|
Write records stored in a DataFrame into PostgreSQL. |
MySQL¶
|
Return a pymysql connection from a Glue Catalog Connection or Secrets Manager. |
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
|
Write records stored in a DataFrame into MySQL. |
Microsoft SQL Server¶
|
Return a pyodbc connection from a Glue Catalog Connection. |
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
|
Write records stored in a DataFrame into Microsoft SQL Server. |
Oracle¶
|
Return a oracledb connection from a Glue Catalog Connection. |
Return a DataFrame corresponding to the result set of the query string. |
|
Return a DataFrame corresponding the table. |
|
|
Write records stored in a DataFrame into Oracle Database. |
Data API Redshift¶
|
Provides access to a Redshift cluster via the Data API. |
|
Create a Redshift Data API connection. |
|
Run an SQL query on a RedshiftDataApi connection and return the result as a DataFrame. |
Data API RDS¶
|
Provides access to the RDS Data API. |
|
Create a RDS Data API connection. |
|
Run an SQL query on an RdsDataApi connection and return the result as a DataFrame. |
AWS Glue Data Quality¶
|
Create recommendation Data Quality ruleset. |
|
Create Data Quality ruleset. |
|
Evaluate Data Quality ruleset. |
|
Get a Data Quality ruleset. |
|
Update Data Quality ruleset. |
OpenSearch¶
|
Create a secure connection to the specified Amazon OpenSearch domain. |
|
Create Amazon OpenSearch Serverless collection. |
|
Create an index. |
|
Delete an index. |
|
Index all documents from a CSV file to OpenSearch index. |
|
Index all documents to OpenSearch index. |
|
Index all documents from a DataFrame to OpenSearch index. |
|
Index all documents from JSON file to OpenSearch index. |
|
Return results matching query DSL as pandas DataFrame. |
|
Return results matching SQL query as pandas DataFrame. |
Amazon Neptune¶
|
Create a connection to a Neptune cluster. |
|
Return results of a Gremlin traversal as pandas DataFrame. |
|
Return results of a openCypher traversal as pandas DataFrame. |
|
Return results of a SPARQL query as pandas DataFrame. |
|
Flatten the lists and dictionaries of the input data frame. |
|
Write records stored in a DataFrame into Amazon Neptune. |
|
Write records stored in a DataFrame into Amazon Neptune. |
|
Write records into Amazon Neptune using the Neptune Bulk Loader. |
|
Load CSV files from S3 into Amazon Neptune using the Neptune Bulk Loader. |
DynamoDB¶
|
Delete all items in the specified DynamoDB table. |
|
Run a PartiQL statement against a DynamoDB table. |
|
Get DynamoDB table object for specified table name. |
|
Write all items from a CSV file to a DynamoDB. |
|
Write all items from a DataFrame to a DynamoDB. |
|
Insert all items to the specified DynamoDB table. |
|
Write all items from JSON file to a DynamoDB. |
|
Read items from given DynamoDB table. |
|
Read data from a DynamoDB table via a PartiQL query. |
Amazon Timestream¶
|
Batch load a Pandas DataFrame into a Amazon Timestream table. |
|
Batch load files from S3 into a Amazon Timestream table. |
|
Create a new Timestream database. |
|
Create a new Timestream database. |
|
Delete a given Timestream database. |
|
Delete a given Timestream table. |
|
List all databases in timestream. |
|
List tables in timestream. |
|
Run a query and retrieve the result as a Pandas DataFrame. |
|
Wait for the Timestream batch load task to complete. |
|
Store a Pandas DataFrame into an Amazon Timestream table. |
Amazon EMR¶
|
Build the Step structure (dictionary). |
|
Build the Step structure (dictionary). |
|
Create a EMR cluster with instance fleets configuration. |
|
Get the EMR cluster state. |
|
Get EMR step state. |
|
Update internal ECR credentials. |
|
Submit Spark Step. |
|
Submit new job in the EMR Cluster. |
|
Submit a list of steps. |
|
Terminate EMR cluster. |
Amazon CloudWatch Logs¶
|
Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame. |
|
Run a query against AWS CloudWatchLogs Insights and wait the results. |
|
Run a query against AWS CloudWatchLogs Insights. |
|
Wait query ends. |
|
List the log streams for the specified log group, return results as a Pandas DataFrame. |
|
List log events from the specified log group. |
Amazon QuickSight¶
|
Cancel an ongoing ingestion of data into SPICE. |
|
Create a QuickSight data source pointing to an Athena/Workgroup. |
|
Create a QuickSight dataset. |
|
Create and starts a new SPICE ingestion on a dataset. |
|
Delete all dashboards. |
|
Delete all data sources. |
|
Delete all datasets. |
|
Delete all templates. |
|
Delete a dashboard. |
|
Delete a data source. |
|
Delete a dataset. |
|
Delete a template. |
|
Describe a QuickSight dashboard by name or ID. |
|
Describe a QuickSight data source by name or ID. |
|
Describe a QuickSight data source permissions by name or ID. |
|
Describe a QuickSight dataset by name or ID. |
|
Describe a QuickSight ingestion by ID. |
|
Get QuickSight dashboard ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight dashboard IDs given a name. |
|
Get QuickSight data source ARN given a name and fails if there is more than 1 ARN associated with this name. |
|
Get QuickSight Data source ARNs given a name. |
|
Get QuickSight data source ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight data source IDs given a name. |
|
Get QuickSight Dataset ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight dataset IDs given a name. |
|
Get QuickSight template ID given a name and fails if there is more than 1 ID associated with this name. |
|
Get QuickSight template IDs given a name. |
|
List dashboards in an AWS account. |
|
List all QuickSight Data sources summaries. |
|
List all QuickSight datasets summaries. |
|
List all QuickSight Groups. |
|
List all QuickSight Group memberships. |
|
List IAM policy assignments in the current Amazon QuickSight account. |
|
List all the IAM policy assignments. |
|
List the history of SPICE ingestions for a dataset. |
|
List all QuickSight templates. |
|
Return a list of all of the Amazon QuickSight users belonging to this account. |
|
List the Amazon QuickSight groups that an Amazon QuickSight user is a member of. |
AWS STS¶
|
Get Account ID. |
|
Get current user/role ARN. |
|
Get current user/role name. |
AWS Secrets Manager¶
|
Get secret value. |
|
Get JSON secret value. |
Amazon Chime¶
|
Send message on an existing Chime Chat rooms. |
Typing¶
Typed dictionary defining the settings for the Glue table. |
|
Typed dictionary defining the settings for using CTAS (Create Table As Statement). |
|
Typed dictionary defining the settings for using UNLOAD. |
|
Typed dictionary defining the settings for using cached Athena results. |
|
Typed dictionary defining the settings for Athena Partition Projection. |
|
Typed dictionary defining the settings for distributing calls using Ray. |
|
Typed dictionary defining the settings for distributing reading calls using Ray. |
|
Typed dictionary defining the dictionary returned by S3 write functions. |
Global Configurations¶
|
Reset one or all (if None is received) configuration values. |
Load all configurations on a Pandas DataFrame. |
Distributed - Ray¶
|
Connect to an existing Ray cluster or start one and connect to it. |
