An AWS Professional Service open source initiative | aws-proserve-opensource@amazon.com

AWS Data Wrangler is now AWS SDK for pandas (awswrangler). We’re changing the name we use when we talk about the library, but everything else will stay the same. You’ll still be able to install using pip install awswrangler and you won’t need to change any of your code. As part of this change, we’ve moved the library from AWS Labs to the main AWS GitHub organisation but, thanks to the GitHub’s redirect feature, you’ll still be able to access the project by its old URLs until you update your bookmarks. Our documentation has also moved to aws-sdk-pandas.readthedocs.io, but old bookmarks will redirect to the new site.

Quick Start

>>> pip install awswrangler
import awswrangler as wr
import pandas as pd
from datetime import datetime

df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})

# Storing data on Data Lake
wr.s3.to_parquet(
    df=df,
    path="s3://bucket/dataset/",
    dataset=True,
    database="my_db",
    table="my_table"
)

# Retrieving the data directly from Amazon S3
df = wr.s3.read_parquet("s3://bucket/dataset/", dataset=True)

# Retrieving the data from Amazon Athena
df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db")

# Get a Redshift connection from Glue Catalog and retrieving data from Redshift Spectrum
con = wr.redshift.connect("my-glue-connection")
df = wr.redshift.read_sql_query("SELECT * FROM external_schema.my_table", con=con)
con.close()

# Amazon Timestream Write
df = pd.DataFrame({
    "time": [datetime.now(), datetime.now()],
    "my_dimension": ["foo", "boo"],
    "measure": [1.0, 1.1],
})
rejected_records = wr.timestream.write(df,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="measure",
    dimensions_cols=["my_dimension"],
)

# Amazon Timestream Query
wr.timestream.query("""
SELECT time, measure_value::double, my_dimension
FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
""")

Read The Docs

What is AWS SDK for pandas?

An AWS Professional Service open source python initiative that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services.

Easy integration with Athena, Glue, Redshift, Timestream, OpenSearch, Neptune, QuickSight, Chime, CloudWatchLogs, DynamoDB, EMR, SecretManager, PostgreSQL, MySQL, SQLServer and S3 (Parquet, CSV, JSON and EXCEL).

Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

Check our tutorials or the list of functionalities.

Install

AWS SDK for pandas runs on Python 3.7, 3.8, 3.9 and 3.10, and on several platforms (AWS Lambda, AWS Glue Python Shell, EMR, EC2, on-premises, Amazon SageMaker, local, etc).

Some good practices to follow for options below are:

  • Use new and isolated Virtual Environments for each project (venv).

  • On Notebooks, always restart your kernel after installations.

Note

If you want to use awswrangler to connect to Microsoft SQL Server or Oracle, some additional configuration is needed. Please have a look at the corresponding section below.

PyPI (pip)

>>> pip install awswrangler

Conda

>>> conda install -c conda-forge awswrangler

AWS Lambda Layer

Managed Layer

Note

There is a one week minimum delay between version release and layers being available in the AWS Lambda console.

AWS SDK for pandas is available as an AWS Lambda Managed layer in all AWS commercial regions.

It can be accessed in the AWS Lambda console directly:

AWS Managed Lambda Layer

Or via its ARN: arn:aws:lambda:<region>:336392948345:layer:AWSSDKPandas-Python<python-version>:<layer-version>.

For example: arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python37:1.

The full list of ARNs is available here.

Custom Layer

You can also create your own Lambda layer with these instructions:

1 - Go to GitHub’s release section and download the zipped layer for to the desired version. Alternatively, you can download the zip from the public artifacts bucket.

2 - Go to the AWS Lambda console, open the layer section (left side) and click create layer.

3 - Set name and python version, upload your downloaded zip file and press create.

4 - Go to your Lambda function and select your new layer!

Serverless Application Repository (SAR)

AWS SDK for pandas layers are also available in the AWS Serverless Application Repository (SAR).

The app deploys the Lambda layer version in your own AWS account and region via a CloudFormation stack. This option provides the ability to use semantic versions (i.e. library version) instead of Lambda layer versions.

AWS SDK for pandas Layer Apps

App

ARN

Description

aws-sdk-pandas-layer-py3-7

arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-7

Layer for Python 3.7.x runtimes

aws-sdk-pandas-layer-py3-8

arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-8

Layer for Python 3.8.x runtimes

aws-sdk-pandas-layer-py3-9

arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-9

Layer for Python 3.9.x runtimes

Here is an example of how to create and use the AWS SDK for pandas Lambda layer in your CDK app:

from aws_cdk import core, aws_sam as sam, aws_lambda

class AWSSDKPandasApp(core.Construct):
  def __init__(self, scope: core.Construct, id_: str):
    super.__init__(scope,id)

    aws_sdk_pandas_layer = sam.CfnApplication(
      self,
      "awssdkpandas-layer",
      location=sam.CfnApplication.ApplicationLocationProperty(
        application_id="arn:aws:serverlessrepo:us-east-1:336392948345:applications/aws-sdk-pandas-layer-py3-8",
        semantic_version="2.20.1",  # Get the latest version from https://github.com/aws/aws-sdk-pandas/releases
      ),
    )

    aws_sdk_pandas_layer_arn = aws_sdk_pandas_layer.get_att("Outputs.WranglerLayer38Arn").to_string()
    aws_sdk_pandas_layer_version = aws_lambda.LayerVersion.from_layer_version_arn(self, "awssdkpandas-layer-version", aws_sdk_pandas_layer_arn)

    aws_lambda.Function(
      self,
      "awssdkpandas-function",
      runtime=aws_lambda.Runtime.PYTHON_3_8,
      function_name="sample-awssdk-pandas-lambda-function",
      code=aws_lambda.Code.from_asset("./src/awssdk-pandas-lambda"),
      handler='lambda_function.lambda_handler',
      layers=[aws_sdk_pandas_layer_version]
    )

AWS Glue Python Shell Jobs

Note

Glue Python Shell Python3.9 has version 2.15.1 of awswrangler baked in. If you need a different version, follow instructions below:

1 - Go to GitHub’s release page and download the wheel file (.whl) related to the desired version. Alternatively, you can download the wheel from the public artifacts bucket.

2 - Upload the wheel file to the Amazon S3 location of your choice.

3 - Go to your Glue Python Shell job and point to the S3 wheel file in the Python library path field.

Official Glue Python Shell Reference

AWS Glue PySpark Jobs

Note

AWS SDK for pandas has compiled dependencies (C/C++) so support is only available for Glue PySpark Jobs >= 2.0.

Go to your Glue PySpark job and create a new Job parameters key/value:

  • Key: --additional-python-modules

  • Value: pyarrow==2,awswrangler

To install a specific version, set the value for the above Job parameter as follows:

  • Value: cython==0.29.21,pg8000==1.21.0,pyarrow==2,pandas==1.3.0,awswrangler==2.20.1

Note

Pyarrow 3 is not currently supported in Glue PySpark Jobs, which is why an installation of pyarrow 2 is required.

Official Glue PySpark Reference

Public Artifacts

Lambda zipped layers and Python wheels are stored in a publicly accessible S3 bucket for all versions.

  • Bucket: aws-data-wrangler-public-artifacts

  • Prefix: releases/<version>/

    • Lambda layer: awswrangler-layer-<version>-py<py-version>.zip

    • Python wheel: awswrangler-<version>-py3-none-any.whl

For example: s3://aws-data-wrangler-public-artifacts/releases/2.20.1/awswrangler-layer-2.20.1-py3.8.zip

Amazon SageMaker Notebook

Run this command in any Python 3 notebook cell and then make sure to restart the kernel before importing the awswrangler package.

>>> !pip install awswrangler

Amazon SageMaker Notebook Lifecycle

Open the AWS SageMaker console, go to the lifecycle section and use the below snippet to configure AWS SDK for pandas for all compatible SageMaker kernels (Reference).

#!/bin/bash

set -e

# OVERVIEW
# This script installs a single pip package in all SageMaker conda environments, apart from the JupyterSystemEnv which
# is a system environment reserved for Jupyter.
# Note this may timeout if the package installations in all environments take longer than 5 mins, consider using
# "nohup" to run this as a background process in that case.

sudo -u ec2-user -i <<'EOF'

# PARAMETERS
PACKAGE=awswrangler

# Note that "base" is special environment name, include it there as well.
for env in base /home/ec2-user/anaconda3/envs/*; do
    source /home/ec2-user/anaconda3/bin/activate $(basename "$env")
    if [ $env = 'JupyterSystemEnv' ]; then
        continue
    fi
    nohup pip install --upgrade "$PACKAGE" &
    source /home/ec2-user/anaconda3/bin/deactivate
done
EOF

EMR Cluster

Despite not being a distributed library, AWS SDK for pandas could be used to complement Big Data pipelines.

  • Configure Python 3 as the default interpreter for PySpark on your cluster configuration [ONLY REQUIRED FOR EMR < 6]

    [
      {
         "Classification": "spark-env",
         "Configurations": [
           {
             "Classification": "export",
             "Properties": {
                "PYSPARK_PYTHON": "/usr/bin/python3"
              }
           }
        ]
      }
    ]
    
  • Keep the bootstrap script above on S3 and reference it on your cluster.

    • For EMR Release < 6

      #!/usr/bin/env bash
      set -ex
      
      sudo pip-3.6 install pyarrow==2 awswrangler
      
    • For EMR Release >= 6

      #!/usr/bin/env bash
      set -ex
      
      sudo pip install pyarrow==2 awswrangler
      

Note

Make sure to freeze the library version in the bootstrap for production environments (e.g. awswrangler==2.20.1)

Note

Pyarrow 3 is not currently supported in the default EMR image, which is why an installation of pyarrow 2 is required.

From Source

>>> git clone https://github.com/aws/aws-sdk-pandas.git
>>> cd aws-sdk-pandas
>>> pip install .

Notes for Microsoft SQL Server

awswrangler uses pyodbc for interacting with Microsoft SQL Server. To install this package you need the ODBC header files, which can be installed, with the following commands:

>>> sudo apt install unixodbc-dev
>>> yum install unixODBC-devel

After installing these header files you can either just install pyodbc or awswrangler with the sqlserver extra, which will also install pyodbc:

>>> pip install pyodbc
>>> pip install awswrangler[sqlserver]

Finally you also need the correct ODBC Driver for SQL Server. You can have a look at the documentation from Microsoft to see how they can be installed in your environment.

If you want to connect to Microsoft SQL Server from AWS Lambda, you can build a separate Layer including the needed OBDC drivers and pyobdc.

If you maintain your own environment, you need to take care of the above steps. Because of this limitation usage in combination with Glue jobs is limited and you need to rely on the provided functionality inside Glue itself.

Notes for Oracle Database

awswrangler is using the oracledb for interacting with Oracle Database. For installing this package you do not need the Oracle Client libraries unless you want to use the Thick mode. You can have a look at the documentation from Oracle to see how they can be installed in your environment.

After installing these client libraries you can either just install oracledb or awswrangler with the oracle extra, which will also install oracledb:

>>> pip install oracledb
>>> pip install awswrangler[oracle]

If you maintain your own environment, you need to take care of the above steps. Because of this limitation usage in combination with Glue jobs is limited and you need to rely on the provided functionality inside Glue itself.

Notes for SPARQL support

To be able to use SPARQL either just install SPARQLWrapper or awswrangler with the sparql extra, which will also install SPARQLWrapper:

>>> pip install SPARQLWrapper
>>> pip install awswrangler[sparql]

Tutorials

Note

You can also find all Tutorial Notebooks on GitHub.

AWS SDK for pandas

1 - Introduction

What is AWS SDK for pandas?

An open-source Python package that extends the power of Pandas library to AWS connecting DataFrames and AWS data related services (Amazon Redshift, AWS Glue, Amazon Athena, Amazon Timestream, Amazon EMR, etc).

Built on top of other open-source projects like Pandas, Apache Arrow and Boto3, it offers abstracted functions to execute usual ETL tasks like load/unload data from Data Lakes, Data Warehouses and Databases.

Check our list of functionalities.

How to install?

awswrangler runs almost anywhere over Python 3.7, 3.8, 3.9 and 3.10, so there are several different ways to install it in the desired environment.

Some good practices for most of the above methods are: - Use new and individual Virtual Environments for each project (venv) - On Notebooks, always restart your kernel after installations.

Let’s Install it!

[ ]:
!pip install awswrangler

Restart your kernel after the installation!

[1]:
import awswrangler as wr

wr.__version__
[1]:
'2.0.0'

AWS SDK for pandas

2 - Sessions

How awswrangler handles Sessions and AWS credentials?

After version 1.0.0 awswrangler relies on Boto3.Session() to manage AWS credentials and configurations.

awswrangler will not store any kind of state internally. Users are in charge of managing Sessions.

Most awswrangler functions receive the optional boto3_session argument. If None is received, the default boto3 Session will be used.

[1]:
import awswrangler as wr
import boto3

Using the default Boto3 Session

[2]:
wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake")
[2]:
False

Customizing and using the default Boto3 Session

[3]:
boto3.setup_default_session(region_name="us-east-2")

wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake")
[3]:
False

Using a new custom Boto3 Session

[4]:
my_session = boto3.Session(region_name="us-east-2")

wr.s3.does_object_exist("s3://noaa-ghcn-pds/fake", boto3_session=my_session)
[4]:
False

AWS SDK for pandas

3 - Amazon S3

Table of Contents

[1]:
import awswrangler as wr
import pandas as pd
import boto3
import pytz
from datetime import datetime

df1 = pd.DataFrame({
    "id": [1, 2],
    "name": ["foo", "boo"]
})

df2 = pd.DataFrame({
    "id": [3],
    "name": ["bar"]
})

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()

1. CSV files

1.1 Writing CSV files
[3]:
path1 = f"s3://{bucket}/csv/file1.csv"
path2 = f"s3://{bucket}/csv/file2.csv"

wr.s3.to_csv(df1, path1, index=False)
wr.s3.to_csv(df2, path2, index=False)
1.2 Reading single CSV file
[4]:
wr.s3.read_csv([path1])
[4]:
id name
0 1 foo
1 2 boo
1.3 Reading multiple CSV files
1.3.1 Reading CSV by list
[5]:
wr.s3.read_csv([path1, path2])
[5]:
id name
0 1 foo
1 2 boo
2 3 bar
1.3.2 Reading CSV by prefix
[6]:
wr.s3.read_csv(f"s3://{bucket}/csv/")
[6]:
id name
0 1 foo
1 2 boo
2 3 bar

2. JSON files

2.1 Writing JSON files
[7]:
path1 = f"s3://{bucket}/json/file1.json"
path2 = f"s3://{bucket}/json/file2.json"

wr.s3.to_json(df1, path1)
wr.s3.to_json(df2, path2)
[7]:
['s3://woodadw-test/json/file2.json']
2.2 Reading single JSON file
[8]:
wr.s3.read_json([path1])
[8]:
id name
0 1 foo
1 2 boo
2.3 Reading multiple JSON files
2.3.1 Reading JSON by list
[9]:
wr.s3.read_json([path1, path2])
[9]:
id name
0 1 foo
1 2 boo
0 3 bar
2.3.2 Reading JSON by prefix
[10]:
wr.s3.read_json(f"s3://{bucket}/json/")
[10]:
id name
0 1 foo
1 2 boo
0 3 bar

3. Parquet files

For more complex features releated to Parquet Dataset check the tutorial number 4.

3.1 Writing Parquet files
[11]:
path1 = f"s3://{bucket}/parquet/file1.parquet"
path2 = f"s3://{bucket}/parquet/file2.parquet"

wr.s3.to_parquet(df1, path1)
wr.s3.to_parquet(df2, path2)
3.2 Reading single Parquet file
[12]:
wr.s3.read_parquet([path1])
[12]:
id name
0 1 foo
1 2 boo
3.3 Reading multiple Parquet files
3.3.1 Reading Parquet by list
[13]:
wr.s3.read_parquet([path1, path2])
[13]:
id name
0 1 foo
1 2 boo
2 3 bar
3.3.2 Reading Parquet by prefix
[14]:
wr.s3.read_parquet(f"s3://{bucket}/parquet/")
[14]:
id name
0 1 foo
1 2 boo
2 3 bar

4. Fixed-width formatted files (only read)

As of today, Pandas doesn’t implement a to_fwf functionality, so let’s manually write two files:

[15]:
content = "1  Herfelingen 27-12-18\n"\
          "2    Lambusart 14-06-18\n"\
          "3 Spormaggiore 15-04-18"
boto3.client("s3").put_object(Body=content, Bucket=bucket, Key="fwf/file1.txt")

content = "4    Buizingen 05-09-19\n"\
          "5   San Rafael 04-09-19"
boto3.client("s3").put_object(Body=content, Bucket=bucket, Key="fwf/file2.txt")

path1 = f"s3://{bucket}/fwf/file1.txt"
path2 = f"s3://{bucket}/fwf/file2.txt"
4.1 Reading single FWF file
[16]:
wr.s3.read_fwf([path1], names=["id", "name", "date"])
[16]:
id name date
0 1 Herfelingen 27-12-18
1 2 Lambusart 14-06-18
2 3 Spormaggiore 15-04-18
4.2 Reading multiple FWF files
4.2.1 Reading FWF by list
[17]:
wr.s3.read_fwf([path1, path2], names=["id", "name", "date"])
[17]:
id name date
0 1 Herfelingen 27-12-18
1 2 Lambusart 14-06-18
2 3 Spormaggiore 15-04-18
3 4 Buizingen 05-09-19
4 5 San Rafael 04-09-19
4.2.2 Reading FWF by prefix
[18]:
wr.s3.read_fwf(f"s3://{bucket}/fwf/", names=["id", "name", "date"])
[18]:
id name date
0 1 Herfelingen 27-12-18
1 2 Lambusart 14-06-18
2 3 Spormaggiore 15-04-18
3 4 Buizingen 05-09-19
4 5 San Rafael 04-09-19

5. Excel files

5.1 Writing Excel file
[19]:
path = f"s3://{bucket}/file0.xlsx"

wr.s3.to_excel(df1, path, index=False)
[19]:
's3://woodadw-test/file0.xlsx'
5.2 Reading Excel file
[20]:
wr.s3.read_excel(path)
[20]:
id name
0 1 foo
1 2 boo

6. Reading with lastModified filter

Specify the filter by LastModified Date.

The filter needs to be specified as datime with time zone

Internally the path needs to be listed, after that the filter is applied.

The filter compare the s3 content with the variables lastModified_begin and lastModified_end

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html

6.1 Define the Date time with UTC Timezone
[21]:
begin = datetime.strptime("20-07-31 20:30", "%y-%m-%d %H:%M")
end = datetime.strptime("21-07-31 20:30", "%y-%m-%d %H:%M")

begin_utc = pytz.utc.localize(begin)
end_utc = pytz.utc.localize(end)
6.2 Define the Date time and specify the Timezone
[22]:
begin = datetime.strptime("20-07-31 20:30", "%y-%m-%d %H:%M")
end = datetime.strptime("21-07-31 20:30", "%y-%m-%d %H:%M")

timezone = pytz.timezone("America/Los_Angeles")

begin_Los_Angeles = timezone.localize(begin)
end_Los_Angeles = timezone.localize(end)
6.3 Read json using the LastModified filters
[23]:
wr.s3.read_fwf(f"s3://{bucket}/fwf/", names=["id", "name", "date"], last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_json(f"s3://{bucket}/json/", last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_csv(f"s3://{bucket}/csv/", last_modified_begin=begin_utc, last_modified_end=end_utc)
wr.s3.read_parquet(f"s3://{bucket}/parquet/", last_modified_begin=begin_utc, last_modified_end=end_utc)

7. Download objects

Objects can be downloaded from S3 using either a path to a local file or a file-like object in binary mode.

7.1 Download object to a file path
[24]:
local_file_dir = getpass.getpass()
[25]:
import os

path1 = f"s3://{bucket}/csv/file1.csv"
local_file = os.path.join(local_file_dir, "file1.csv")
wr.s3.download(path=path1, local_file=local_file)

pd.read_csv(local_file)
[25]:
id name
0 1 foo
1 2 boo
7.2 Download object to a file-like object in binary mode
[26]:
path2 = f"s3://{bucket}/csv/file2.csv"
local_file = os.path.join(local_file_dir, "file2.csv")
with open(local_file, mode="wb") as local_f:
    wr.s3.download(path=path2, local_file=local_f)

pd.read_csv(local_file)
[26]:
id name
0 3 bar

8. Upload objects

Objects can be uploaded to S3 using either a path to a local file or a file-like object in binary mode.

8.1 Upload object from a file path
[27]:
local_file = os.path.join(local_file_dir, "file1.csv")
wr.s3.upload(local_file=local_file, path=path1)

wr.s3.read_csv(path1)
[27]:
id name
0 1 foo
1 2 boo
8.2 Upload object from a file-like object in binary mode
[28]:
local_file = os.path.join(local_file_dir, "file2.csv")
with open(local_file, "rb") as local_f:
    wr.s3.upload(local_file=local_f, path=path2)

wr.s3.read_csv(path2)
[28]:
id name
0 3 bar

9. Delete objects

[29]:
wr.s3.delete_objects(f"s3://{bucket}/")

AWS SDK for pandas

4 - Parquet Datasets

awswrangler has 3 different write modes to store Parquet Datasets on Amazon S3.

  • append (Default)

    Only adds new files without any delete.

  • overwrite

    Deletes everything in the target directory and then add new files. If writing new files fails for any reason, old files are not restored.

  • overwrite_partitions (Partition Upsert)

    Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.

[1]:
from datetime import date
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Creating the Dataset

[3]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite"
)

wr.s3.read_parquet(path, dataset=True)
[3]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

Appending

[4]:
df = pd.DataFrame({
    "id": [3],
    "value": ["bar"],
    "date": [date(2020, 1, 3)]
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="append"
)

wr.s3.read_parquet(path, dataset=True)
[4]:
id value date
0 3 bar 2020-01-03
1 1 foo 2020-01-01
2 2 boo 2020-01-02

Overwriting

[5]:
wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite"
)

wr.s3.read_parquet(path, dataset=True)
[5]:
id value date
0 3 bar 2020-01-03

Creating a Partitioned Dataset

[6]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["date"]
)

wr.s3.read_parquet(path, dataset=True)
[6]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

Upserting partitions (overwrite_partitions)

[7]:
df = pd.DataFrame({
    "id": [2, 3],
    "value": ["xoo", "bar"],
    "date": [date(2020, 1, 2), date(2020, 1, 3)]
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite_partitions",
    partition_cols=["date"]
)

wr.s3.read_parquet(path, dataset=True)
[7]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
2 3 bar 2020-01-03

BONUS - Glue/Athena integration

[8]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="aws_sdk_pandas",
    table="my_table"
)

wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_sdk_pandas")
[8]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

AWS SDK for pandas

5 - Glue Catalog

awswrangler makes heavy use of Glue Catalog to store metadata of tables and connections.

[1]:
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
 ············
Creating a Pandas DataFrame
[3]:
df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["shoes", "tshirt", "ball"],
    "price": [50.3, 10.5, 20.0],
    "in_stock": [True, True, False]
})
df
[3]:
id name price in_stock
0 1 shoes 50.3 True
1 2 tshirt 10.5 True
2 3 ball 20.0 False

Checking Glue Catalog Databases

[4]:
databases = wr.catalog.databases()
print(databases)
            Database                                   Description
0  aws_sdk_pandas  AWS SDK for pandas Test Arena - Glue Database
1            default                         Default Hive database
Create the database awswrangler_test if not exists
[5]:
if "awswrangler_test" not in databases.values:
    wr.catalog.create_database("awswrangler_test")
    print(wr.catalog.databases())
else:
    print("Database awswrangler_test already exists")
            Database                                   Description
0  aws_sdk_pandas  AWS SDK for pandas Test Arena - Glue Database
1   awswrangler_test
2            default                         Default Hive database

Checking the empty database

[6]:
wr.catalog.tables(database="awswrangler_test")
[6]:
Database Table Description Columns Partitions
Writing DataFrames to Data Lake (S3 + Parquet + Glue Catalog)
[7]:
desc = "This is my product table."

param = {
    "source": "Product Web Service",
    "class": "e-commerce"
}

comments = {
    "id": "Unique product ID.",
    "name": "Product name",
    "price": "Product price (dollar)",
    "in_stock": "Is this product availaible in the stock?"
}

res = wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/products/",
    dataset=True,
    database="awswrangler_test",
    table="products",
    mode="overwrite",
    description=desc,
    parameters=param,
    columns_comments=comments
)
Checking Glue Catalog (AWS Console)
Glue Console
Looking Up for the new table!
[8]:
wr.catalog.tables(name_contains="roduc")
[8]:
Database Table Description Columns Partitions
0 awswrangler_test products This is my product table. id, name, price, in_stock
[9]:
wr.catalog.tables(name_prefix="pro")
[9]:
Database Table Description Columns Partitions
0 awswrangler_test products This is my product table. id, name, price, in_stock
[10]:
wr.catalog.tables(name_suffix="ts")
[10]:
Database Table Description Columns Partitions
0 awswrangler_test products This is my product table. id, name, price, in_stock
[11]:
wr.catalog.tables(search_text="This is my")
[11]:
Database Table Description Columns Partitions
0 awswrangler_test products This is my product table. id, name, price, in_stock
Getting tables details
[12]:
wr.catalog.table(database="awswrangler_test", table="products")
[12]:
Column Name Type Partition Comment
0 id bigint False Unique product ID.
1 name string False Product name
2 price double False Product price (dollar)
3 in_stock boolean False Is this product availaible in the stock?

Cleaning Up the Database

[13]:
for table in wr.catalog.get_tables(database="awswrangler_test"):
    wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])
Delete Database
[14]:
wr.catalog.delete_database('awswrangler_test')

AWS SDK for pandas

6 - Amazon Athena

awswrangler has three ways to run queries on Athena and fetch the result as a DataFrame:

  • ctas_approach=True (Default)

    Wraps the query with a CTAS and then reads the table data as parquet directly from s3.

    • PROS:

      • Faster for mid and big result sizes.

      • Can handle some level of nested types.

    • CONS:

      • Requires create/delete table permissions on Glue.

      • Does not support timestamp with time zone

      • Does not support columns with repeated names.

      • Does not support columns with undefined data types.

      • A temporary table will be created and then deleted immediately.

      • Does not support custom data_source/catalog_id.

  • unload_approach=True and ctas_approach=False

    Does an UNLOAD query on Athena and parse the Parquet result on s3.

    • PROS:

      • Faster for mid and big result sizes.

      • Can handle some level of nested types.

      • Does not modify Glue Data Catalog.

    • CONS:

      • Output S3 path must be empty.

      • Does not support timestamp with time zone

      • Does not support columns with repeated names.

      • Does not support columns with undefined data types.

  • ctas_approach=False

    Does a regular query on Athena and parse the regular CSV result on s3.

    • PROS:

      • Faster for small result sizes (less latency).

      • Does not require create/delete table permissions on Glue

      • Supports timestamp with time zone.

      • Support custom data_source/catalog_id.

    • CONS:

      • Slower (But stills faster than other libraries that uses the regular Athena API)

      • Does not handle nested types at all.

[1]:
import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files

Reference

[ ]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/by_year/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

df
[ ]:
wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
)
[ ]:
wr.catalog.table(database="awswrangler_test", table="noaa")

Reading with ctas_approach=False

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False)

Default with ctas_approach=True - 13x faster (default)

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test")

Using categories to speed up and save memory - 24x faster

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", categories=["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"])

Reading with unload_approach=True

[ ]:
%%time

wr.athena.read_sql_query("SELECT * FROM noaa", database="awswrangler_test", ctas_approach=False, unload_approach=True, s3_output=f"s3://{bucket}/unload/")

Batching (Good for restricted memory environments)

[ ]:
%%time

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=True  # Chunksize calculated automatically for ctas_approach.
)

for df in dfs:  # Batching
    print(len(df.index))
[ ]:
%%time

dfs = wr.athena.read_sql_query(
    "SELECT * FROM noaa",
    database="awswrangler_test",
    chunksize=100_000_000
)

for df in dfs:  # Batching
    print(len(df.index))

Cleaning Up S3

[ ]:
wr.s3.delete_objects(path)

Delete table

[ ]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")

Delete Database

[ ]:
wr.catalog.delete_database('awswrangler_test')

AWS SDK for pandas

7 - Redshift, MySQL, PostgreSQL, SQL Server and Oracle

awswrangler’s Redshift, MySQL and PostgreSQL have two basic functions in common that try to follow Pandas conventions, but add more data type consistency.

[1]:
import awswrangler as wr
import pandas as pd

df = pd.DataFrame({
    "id": [1, 2],
    "name": ["foo", "boo"]
})

Connect using the Glue Catalog Connections

[2]:
con_redshift = wr.redshift.connect("aws-sdk-pandas-redshift")
con_mysql = wr.mysql.connect("aws-sdk-pandas-mysql")
con_postgresql = wr.postgresql.connect("aws-sdk-pandas-postgresql")
con_sqlserver = wr.sqlserver.connect("aws-sdk-pandas-sqlserver")
con_oracle = wr.oracle.connect("aws-sdk-pandas-oracle")

Raw SQL queries (No Pandas)

[3]:
with con_redshift.cursor() as cursor:
    for row in cursor.execute("SELECT 1"):
        print(row)
[1]

Loading data to Database

[4]:
wr.redshift.to_sql(df, con_redshift, schema="public", table="tutorial", mode="overwrite")
wr.mysql.to_sql(df, con_mysql, schema="test", table="tutorial", mode="overwrite")
wr.postgresql.to_sql(df, con_postgresql, schema="public", table="tutorial", mode="overwrite")
wr.sqlserver.to_sql(df, con_sqlserver, schema="dbo", table="tutorial", mode="overwrite")
wr.oracle.to_sql(df, con_oracle, schema="test", table="tutorial", mode="overwrite")

Unloading data from Database

[5]:
wr.redshift.read_sql_query("SELECT * FROM public.tutorial", con=con_redshift)
wr.mysql.read_sql_query("SELECT * FROM test.tutorial", con=con_mysql)
wr.postgresql.read_sql_query("SELECT * FROM public.tutorial", con=con_postgresql)
wr.sqlserver.read_sql_query("SELECT * FROM dbo.tutorial", con=con_sqlserver)
wr.oracle.read_sql_query("SELECT * FROM test.tutorial", con=con_oracle)
[5]:
id name
0 1 foo
1 2 boo
[6]:
con_redshift.close()
con_mysql.close()
con_postgresql.close()
con_sqlserver.close()
con_oracle.close()

AWS SDK for pandas

8 - Redshift - COPY & UNLOAD

Amazon Redshift has two SQL command that help to load and unload large amount of data staging it on Amazon S3:

1 - COPY

2 - UNLOAD

Let’s take a look and how awswrangler can use it.

[1]:
import awswrangler as wr

con = wr.redshift.connect("aws-sdk-pandas-redshift")

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/stage/"
 ···········································

Enter your IAM ROLE ARN:

[3]:
iam_role = getpass.getpass()
 ····················································································
Creating a Dataframe from the NOAA’s CSV files

Reference

[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/by_year/1897.csv",
    names=cols,
    parse_dates=["dt", "obs_time"])  # ~127MB, ~4MM rows

df
[4]:
id dt element value m_flag q_flag s_flag obs_time
0 AG000060590 1897-01-01 TMAX 170 NaN NaN E NaN
1 AG000060590 1897-01-01 TMIN -14 NaN NaN E NaN
2 AG000060590 1897-01-01 PRCP 0 NaN NaN E NaN
3 AGE00135039 1897-01-01 TMAX 140 NaN NaN E NaN
4 AGE00135039 1897-01-01 TMIN 40 NaN NaN E NaN
... ... ... ... ... ... ... ... ...
3923594 UZM00038457 1897-12-31 TMIN -145 NaN NaN r NaN
3923595 UZM00038457 1897-12-31 PRCP 4 NaN NaN r NaN
3923596 UZM00038457 1897-12-31 TAVG -95 NaN NaN r NaN
3923597 UZM00038618 1897-12-31 PRCP 66 NaN NaN r NaN
3923598 UZM00038618 1897-12-31 TAVG -45 NaN NaN r NaN

3923599 rows × 8 columns

Load and Unload with COPY and UNLOAD commands

Note: Please use a empty S3 path for the COPY command.

[5]:
%%time

wr.redshift.copy(
    df=df,
    path=path,
    con=con,
    schema="public",
    table="commands",
    mode="overwrite",
    iam_role=iam_role,
)
CPU times: user 2.78 s, sys: 293 ms, total: 3.08 s
Wall time: 20.7 s
[6]:
%%time

wr.redshift.unload(
    sql="SELECT * FROM public.commands",
    con=con,
    iam_role=iam_role,
    path=path,
    keep_files=True,
)
CPU times: user 10 s, sys: 1.14 s, total: 11.2 s
Wall time: 27.5 s
[6]:
id dt element value m_flag q_flag s_flag obs_time
0 AG000060590 1897-01-01 TMAX 170 <NA> <NA> E <NA>
1 AG000060590 1897-01-01 PRCP 0 <NA> <NA> E <NA>
2 AGE00135039 1897-01-01 TMIN 40 <NA> <NA> E <NA>
3 AGE00147705 1897-01-01 TMAX 164 <NA> <NA> E <NA>
4 AGE00147705 1897-01-01 PRCP 0 <NA> <NA> E <NA>
... ... ... ... ... ... ... ... ...
3923594 USW00094967 1897-12-31 TMAX -144 <NA> <NA> 6 <NA>
3923595 USW00094967 1897-12-31 PRCP 0 P <NA> 6 <NA>
3923596 UZM00038457 1897-12-31 TMAX -49 <NA> <NA> r <NA>
3923597 UZM00038457 1897-12-31 PRCP 4 <NA> <NA> r <NA>
3923598 UZM00038618 1897-12-31 PRCP 66 <NA> <NA> r <NA>

7847198 rows × 8 columns

[7]:
con.close()

AWS SDK for pandas

9 - Redshift - Append, Overwrite and Upsert

awswrangler’s copy/to_sql function has three different mode options for Redshift.

1 - append

2 - overwrite

3 - upsert

[2]:
import awswrangler as wr
import pandas as pd
from datetime import date

con = wr.redshift.connect("aws-sdk-pandas-redshift")

Enter your bucket name:

[3]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/stage/"
 ···········································

Enter your IAM ROLE ARN:

[4]:
iam_role = getpass.getpass()
 ····················································································
Creating the table (Overwriting if it exists)
[10]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.redshift.copy(
    df=df,
    path=path,
    con=con,
    schema="public",
    table="my_table",
    mode="overwrite",
    iam_role=iam_role,
    primary_keys=["id"]
)

wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[10]:
id value date
0 2 boo 2020-01-02
1 1 foo 2020-01-01

Appending

[11]:
df = pd.DataFrame({
    "id": [3],
    "value": ["bar"],
    "date": [date(2020, 1, 3)]
})

wr.redshift.copy(
    df=df,
    path=path,
    con=con,
    schema="public",
    table="my_table",
    mode="append",
    iam_role=iam_role,
    primary_keys=["id"]
)

wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[11]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02
2 3 bar 2020-01-03

Upserting

[12]:
df = pd.DataFrame({
    "id": [2, 3],
    "value": ["xoo", "bar"],
    "date": [date(2020, 1, 2), date(2020, 1, 3)]
})

wr.redshift.copy(
    df=df,
    path=path,
    con=con,
    schema="public",
    table="my_table",
    mode="upsert",
    iam_role=iam_role,
    primary_keys=["id"]
)

wr.redshift.read_sql_table(table="my_table", schema="public", con=con)
[12]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
2 3 bar 2020-01-03

Cleaning Up

[13]:
with con.cursor() as cursor:
    cursor.execute("DROP TABLE public.my_table")
con.close()

AWS SDK for pandas

10 - Parquet Crawler

awswrangler can extract only the metadata from Parquet files and Partitions and then add it to the Glue Catalog.

[1]:
import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
 ············
Creating a Parquet Table from the NOAA’s CSV files

Reference

[3]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/by_year/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

df
[3]:
id dt element value m_flag q_flag s_flag obs_time
0 AGE00135039 1890-01-01 TMAX 160 NaN NaN E NaN
1 AGE00135039 1890-01-01 TMIN 30 NaN NaN E NaN
2 AGE00135039 1890-01-01 PRCP 45 NaN NaN E NaN
3 AGE00147705 1890-01-01 TMAX 140 NaN NaN E NaN
4 AGE00147705 1890-01-01 TMIN 74 NaN NaN E NaN
... ... ... ... ... ... ... ... ...
29249753 UZM00038457 1899-12-31 PRCP 16 NaN NaN r NaN
29249754 UZM00038457 1899-12-31 TAVG -73 NaN NaN r NaN
29249755 UZM00038618 1899-12-31 TMIN -76 NaN NaN r NaN
29249756 UZM00038618 1899-12-31 PRCP 0 NaN NaN r NaN
29249757 UZM00038618 1899-12-31 TAVG -60 NaN NaN r NaN

29249758 rows × 8 columns

[4]:
df["year"] = df["dt"].dt.year

df.head(3)
[4]:
id dt element value m_flag q_flag s_flag obs_time year
0 AGE00135039 1890-01-01 TMAX 160 NaN NaN E NaN 1890
1 AGE00135039 1890-01-01 TMIN 30 NaN NaN E NaN 1890
2 AGE00135039 1890-01-01 PRCP 45 NaN NaN E NaN 1890
[5]:
res = wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["year"],
)
[6]:
[ x.split("data/", 1)[1] for x in wr.s3.list_objects(path)]
[6]:
['year=1890/06a519afcf8e48c9b08c8908f30adcfe.snappy.parquet',
 'year=1891/5a99c28dbef54008bfc770c946099e02.snappy.parquet',
 'year=1892/9b1ea5d1cfad40f78c920f93540ca8ec.snappy.parquet',
 'year=1893/92259b49c134401eaf772506ee802af6.snappy.parquet',
 'year=1894/c734469ffff944f69dc277c630064a16.snappy.parquet',
 'year=1895/cf7ccde86aaf4d138f86c379c0817aa6.snappy.parquet',
 'year=1896/ce02f4c2c554438786b766b33db451b6.snappy.parquet',
 'year=1897/e04de04ad3c444deadcc9c410ab97ca1.snappy.parquet',
 'year=1898/acb0e02878f04b56a6200f4b5a97be0e.snappy.parquet',
 'year=1899/a269bdbb0f6a48faac55f3bcfef7df7a.snappy.parquet']

Crawling!

[7]:
%%time

res = wr.s3.store_parquet_metadata(
    path=path,
    database="awswrangler_test",
    table="crawler",
    dataset=True,
    mode="overwrite",
    dtype={"year": "int"}
)
CPU times: user 1.81 s, sys: 528 ms, total: 2.33 s
Wall time: 3.21 s

Checking

[8]:
wr.catalog.table(database="awswrangler_test", table="crawler")
[8]:
Column Name Type Partition Comment
0 id string False
1 dt timestamp False
2 element string False
3 value bigint False
4 m_flag string False
5 q_flag string False
6 s_flag string False
7 obs_time string False
8 year int True
[9]:
%%time

wr.athena.read_sql_query("SELECT * FROM crawler WHERE year=1890", database="awswrangler_test")
CPU times: user 3.52 s, sys: 811 ms, total: 4.33 s
Wall time: 9.6 s
[9]:
id dt element value m_flag q_flag s_flag obs_time year
0 USC00195145 1890-01-01 TMIN -28 <NA> <NA> 6 <NA> 1890
1 USC00196770 1890-01-01 PRCP 0 P <NA> 6 <NA> 1890
2 USC00196770 1890-01-01 SNOW 0 <NA> <NA> 6 <NA> 1890
3 USC00196915 1890-01-01 PRCP 0 P <NA> 6 <NA> 1890
4 USC00196915 1890-01-01 SNOW 0 <NA> <NA> 6 <NA> 1890
... ... ... ... ... ... ... ... ... ...
6139 ASN00022006 1890-12-03 PRCP 0 <NA> <NA> a <NA> 1890
6140 ASN00022007 1890-12-03 PRCP 0 <NA> <NA> a <NA> 1890
6141 ASN00022008 1890-12-03 PRCP 0 <NA> <NA> a <NA> 1890
6142 ASN00022009 1890-12-03 PRCP 0 <NA> <NA> a <NA> 1890
6143 ASN00022011 1890-12-03 PRCP 0 <NA> <NA> a <NA> 1890

1276246 rows × 9 columns

Cleaning Up S3

[10]:
wr.s3.delete_objects(path)

Cleaning Up the Database

[11]:
for table in wr.catalog.get_tables(database="awswrangler_test"):
    wr.catalog.delete_table_if_exists(database="awswrangler_test", table=table["Name"])

AWS SDK for pandas

11 - CSV Datasets

awswrangler has 3 different write modes to store CSV Datasets on Amazon S3.

  • append (Default)

    Only adds new files without any delete.

  • overwrite

    Deletes everything in the target directory and then add new files.

  • overwrite_partitions (Partition Upsert)

    Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.

[1]:
from datetime import date
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

Creating the Dataset

[4]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[4]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

Appending

[5]:
df = pd.DataFrame({
    "id": [3],
    "value": ["bar"],
    "date": [date(2020, 1, 3)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="append",
    database="awswrangler_test",
    table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[5]:
id value date
0 3 bar 2020-01-03
1 1 foo 2020-01-01
2 2 boo 2020-01-02

Overwriting

[6]:
wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[6]:
id value date
0 3 bar 2020-01-03

Creating a Partitioned Dataset

[7]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset",
    partition_cols=["date"]
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[7]:
id value date
0 2 boo 2020-01-02
1 1 foo 2020-01-01

Upserting partitions (overwrite_partitions)

[8]:

df = pd.DataFrame({ "id": [2, 3], "value": ["xoo", "bar"], "date": [date(2020, 1, 2), date(2020, 1, 3)] }) wr.s3.to_csv( df=df, path=path, index=False, dataset=True, mode="overwrite_partitions", database="awswrangler_test", table="csv_dataset", partition_cols=["date"] ) wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[8]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
0 3 bar 2020-01-03

BONUS - Glue/Athena integration

[9]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    index=False,
    mode="overwrite",
    database="aws_sdk_pandas",
    table="my_table",
    compression="gzip"
)

wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_sdk_pandas")
[9]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

AWS SDK for pandas

12 - CSV Crawler

awswrangler can extract only the metadata from a Pandas DataFrame and then add it can be added to Glue Catalog as a table.

[1]:
import awswrangler as wr
from datetime import datetime
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/csv_crawler/"
 ············
Creating a Pandas DataFrame
[3]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f")  # noqa
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date()  # noqa

df = pd.DataFrame(
    {
        "id": [1, 2, 3],
        "string": ["foo", None, "boo"],
        "float": [1.0, None, 2.0],
        "date": [dt("2020-01-01"), None, dt("2020-01-02")],
        "timestamp": [ts("2020-01-01 00:00:00.0"), None, ts("2020-01-02 00:00:01.0")],
        "bool": [True, None, False],
        "par0": [1, 1, 2],
        "par1": ["a", "b", "b"],
    }
)

df
[3]:
id string float date timestamp bool par0 par1
0 1 foo 1.0 2020-01-01 2020-01-01 00:00:00 True 1 a
1 2 None NaN None NaT None 1 b
2 3 boo 2.0 2020-01-02 2020-01-02 00:00:01 False 2 b
Extracting the metadata
[4]:
columns_types, partitions_types = wr.catalog.extract_athena_types(
    df=df,
    file_format="csv",
    index=False,
    partition_cols=["par0", "par1"]
)
[5]:
columns_types
[5]:
{'id': 'bigint',
 'string': 'string',
 'float': 'double',
 'date': 'date',
 'timestamp': 'timestamp',
 'bool': 'boolean'}
[6]:
partitions_types
[6]:
{'par0': 'bigint', 'par1': 'string'}

Creating the table

[7]:
wr.catalog.create_csv_table(
    table="csv_crawler",
    database="awswrangler_test",
    path=path,
    partitions_types=partitions_types,
    columns_types=columns_types,
)

Checking

[8]:
wr.catalog.table(database="awswrangler_test", table="csv_crawler")
[8]:
Column Name Type Partition Comment
0 id bigint False
1 string string False
2 float double False
3 date date False
4 timestamp timestamp False
5 bool boolean False
6 par0 bigint True
7 par1 string True

We can still using the extracted metadata to ensure all data types consistence to new data

[9]:
df = pd.DataFrame(
    {
        "id": [1],
        "string": ["1"],
        "float": [1],
        "date": [ts("2020-01-01 00:00:00.0")],
        "timestamp": [dt("2020-01-02")],
        "bool": [1],
        "par0": [1],
        "par1": ["a"],
    }
)

df
[9]:
id string float date timestamp bool par0 par1
0 1 1 1 2020-01-01 2020-01-02 1 1 a
[10]:
res = wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    database="awswrangler_test",
    table="csv_crawler",
    partition_cols=["par0", "par1"],
    dtype=columns_types
)

You can also extract the metadata directly from the Catalog if you want

[11]:
dtype = wr.catalog.get_table_types(database="awswrangler_test", table="csv_crawler")
[12]:
res = wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    database="awswrangler_test",
    table="csv_crawler",
    partition_cols=["par0", "par1"],
    dtype=dtype
)

Checking out

[13]:
df = wr.athena.read_sql_table(database="awswrangler_test", table="csv_crawler")

df
[13]:
id string float date timestamp bool par0 par1
0 1 1 1.0 None 2020-01-02 True 1 a
1 1 1 1.0 None 2020-01-02 True 1 a
[14]:
df.dtypes
[14]:
id                    Int64
string               string
float               float64
date                 object
timestamp    datetime64[ns]
bool                boolean
par0                  Int64
par1                 string
dtype: object

Cleaning Up S3

[15]:
wr.s3.delete_objects(path)

Cleaning Up the Database

[16]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="csv_crawler")
[16]:
True

AWS SDK for pandas

13 - Merging Datasets on S3

awswrangler has 3 different copy modes to store Parquet Datasets on Amazon S3.

  • append (Default)

    Only adds new files without any delete.

  • overwrite

    Deletes everything in the target directory and then add new files.

  • overwrite_partitions (Partition Upsert)

    Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.

[1]:
from datetime import date
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path1 = f"s3://{bucket}/dataset1/"
path2 = f"s3://{bucket}/dataset2/"
 ············

Creating Dataset 1

[3]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
    "date": [date(2020, 1, 1), date(2020, 1, 2)]
})

wr.s3.to_parquet(
    df=df,
    path=path1,
    dataset=True,
    mode="overwrite",
    partition_cols=["date"]
)

wr.s3.read_parquet(path1, dataset=True)
[3]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

Creating Dataset 2

[4]:
df = pd.DataFrame({
    "id": [2, 3],
    "value": ["xoo", "bar"],
    "date": [date(2020, 1, 2), date(2020, 1, 3)]
})

dataset2_files = wr.s3.to_parquet(
    df=df,
    path=path2,
    dataset=True,
    mode="overwrite",
    partition_cols=["date"]
)["paths"]

wr.s3.read_parquet(path2, dataset=True)
[4]:
id value date
0 2 xoo 2020-01-02
1 3 bar 2020-01-03

Merging (Dataset 2 -> Dataset 1) (APPEND)

[5]:
wr.s3.merge_datasets(
    source_path=path2,
    target_path=path1,
    mode="append"
)

wr.s3.read_parquet(path1, dataset=True)
[5]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
2 2 boo 2020-01-02
3 3 bar 2020-01-03

Merging (Dataset 2 -> Dataset 1) (OVERWRITE_PARTITIONS)

[6]:
wr.s3.merge_datasets(
    source_path=path2,
    target_path=path1,
    mode="overwrite_partitions"
)

wr.s3.read_parquet(path1, dataset=True)
[6]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
2 3 bar 2020-01-03

Merging (Dataset 2 -> Dataset 1) (OVERWRITE)

[7]:
wr.s3.merge_datasets(
    source_path=path2,
    target_path=path1,
    mode="overwrite"
)

wr.s3.read_parquet(path1, dataset=True)
[7]:
id value date
0 2 xoo 2020-01-02
1 3 bar 2020-01-03

Cleaning Up

[8]:
wr.s3.delete_objects(path1)
wr.s3.delete_objects(path2)

AWS SDK for pandas

14 - Schema Evolution

awswrangler supports new columns on Parquet and CSV datasets through:

[1]:
from datetime import date
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ···········································

Creating the Dataset

Parquet
[3]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="aws_sdk_pandas",
    table="my_table"
)

wr.s3.read_parquet(path, dataset=True)
[3]:
id value
0 1 foo
1 2 boo
CSV
[ ]:
df = pd.DataFrame({
    "id": [1, 2],
    "value": ["foo", "boo"],
})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="aws_sdk_pandas",
    table="my_table"
)

wr.s3.read_csv(path, dataset=True)
Schema Version 0 on Glue Catalog (AWS Console)
Glue Console

Appending with NEW COLUMNS

Parquet
[4]:
df = pd.DataFrame({
    "id": [3, 4],
    "value": ["bar", None],
    "date": [date(2020, 1, 3), date(2020, 1, 4)],
    "flag": [True, False]
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="append",
    database="aws_sdk_pandas",
    table="my_table",
    catalog_versioning=True  # Optional
)

wr.s3.read_parquet(path, dataset=True, validate_schema=False)
[4]:
id value date flag
0 3 bar 2020-01-03 True
1 4 None 2020-01-04 False
2 1 foo NaN NaN
3 2 boo NaN NaN
CSV

Note: for CSV datasets due to column ordering, by default, schema evolution is disabled. Enable it by passing schema_evolution=True flag

[ ]:
df = pd.DataFrame({
    "id": [3, 4],
    "value": ["bar", None],
    "date": [date(2020, 1, 3), date(2020, 1, 4)],
    "flag": [True, False]
})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    mode="append",
    database="aws_sdk_pandas",
    table="my_table",
    schema_evolution=True,
    catalog_versioning=True  # Optional
)

wr.s3.read_csv(path, dataset=True, validate_schema=False)
Schema Version 1 on Glue Catalog (AWS Console)
Glue Console

Reading from Athena

[5]:
wr.athena.read_sql_table(table="my_table", database="aws_sdk_pandas")
[5]:
id value date flag
0 3 bar 2020-01-03 True
1 4 None 2020-01-04 False
2 1 foo None <NA>
3 2 boo None <NA>

Cleaning Up

[6]:
wr.s3.delete_objects(path)
wr.catalog.delete_table_if_exists(table="my_table", database="aws_sdk_pandas")
[6]:
True

AWS SDK for pandas

15 - EMR

[1]:
import awswrangler as wr
import boto3

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
 ··········································

Enter your Subnet ID:

[8]:
subnet = getpass.getpass()
 ························

Creating EMR Cluster

[9]:
cluster_id = wr.emr.create_cluster(subnet)

Uploading our PySpark script to Amazon S3

[10]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext

print("Spark Initialized")
"""

_ = boto3.client("s3").put_object(
    Body=script,
    Bucket=bucket,
    Key="test.py"
)

Submit PySpark step

[11]:
step_id = wr.emr.submit_step(cluster_id, command=f"spark-submit s3://{bucket}/test.py")

Wait Step

[12]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
    pass

Terminate Cluster

[13]:
wr.emr.terminate_cluster(cluster_id)

AWS SDK for pandas

16 - EMR & Docker

[ ]:
import awswrangler as wr
import boto3
import getpass

Enter your bucket name:

[2]:
bucket = getpass.getpass()
 ··········································

Enter your Subnet ID:

[3]:
subnet = getpass.getpass()
 ························

Build and Upload Docker Image to ECR repository

Replace the {ACCOUNT_ID} placeholder.

[ ]:
%%writefile Dockerfile

FROM amazoncorretto:8

RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development

RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

RUN python -V
RUN python3 -V

ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip
RUN pip3 install awswrangler

RUN python3 -c "import awswrangler as wr"
[ ]:
%%bash

docker build -t 'local/emr-wrangler' .
aws ecr create-repository --repository-name emr-wrangler
docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler
eval $(aws ecr get-login --region us-east-1 --no-include-email)
docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler

Creating EMR Cluster

[4]:
cluster_id = wr.emr.create_cluster(subnet, docker=True)

Refresh ECR credentials in the cluster (expiration time: 12h )

[5]:
wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/")
[5]:
's-1B0O45RWJL8CL'

Uploading application script to Amazon S3 (PySpark)

[7]:
script = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-awswrangler").getOrCreate()
sc = spark.sparkContext

print("Spark Initialized")

import awswrangler as wr

print(f"awswrangler version: {wr.__version__}")
"""

boto3.client("s3").put_object(Body=script, Bucket=bucket, Key="test_docker.py")

Submit PySpark step

[8]:
DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"

step_id = wr.emr.submit_spark_step(
    cluster_id,
    f"s3://{bucket}/test_docker.py",
    docker_image=DOCKER_IMAGE
)

Wait Step

[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
    pass

Terminate Cluster

[ ]:
wr.emr.terminate_cluster(cluster_id)

Another example with custom configurations

[9]:
cluster_id = wr.emr.create_cluster(
    cluster_name="my-demo-cluster-v2",
    logging_s3_path=f"s3://{bucket}/emr-logs/",
    emr_release="emr-6.7.0",
    subnet_id=subnet,
    emr_ec2_role="EMR_EC2_DefaultRole",
    emr_role="EMR_DefaultRole",
    instance_type_master="m5.2xlarge",
    instance_type_core="m5.2xlarge",
    instance_ebs_size_master=50,
    instance_ebs_size_core=50,
    instance_num_on_demand_master=0,
    instance_num_on_demand_core=0,
    instance_num_spot_master=1,
    instance_num_spot_core=2,
    spot_bid_percentage_of_on_demand_master=100,
    spot_bid_percentage_of_on_demand_core=100,
    spot_provisioning_timeout_master=5,
    spot_provisioning_timeout_core=5,
    spot_timeout_to_on_demand_master=False,
    spot_timeout_to_on_demand_core=False,
    python3=True,
    docker=True,
    spark_glue_catalog=True,
    hive_glue_catalog=True,
    presto_glue_catalog=True,
    debugging=True,
    applications=["Hadoop", "Spark", "Hive", "Zeppelin", "Livy"],
    visible_to_all_users=True,
    maximize_resource_allocation=True,
    keep_cluster_alive_when_no_steps=True,
    termination_protected=False,
    spark_pyarrow=True
)

wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f"s3://{bucket}/emr/")

DOCKER_IMAGE = f"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"

step_id = wr.emr.submit_spark_step(
    cluster_id,
    f"s3://{bucket}/test_docker.py",
    docker_image=DOCKER_IMAGE
)
[ ]:
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
    pass

wr.emr.terminate_cluster(cluster_id)

AWS SDK for pandas

17 - Partition Projection

https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html

[1]:
import awswrangler as wr
import pandas as pd
from datetime import datetime
import getpass

Enter your bucket name:

[2]:
bucket = getpass.getpass()
 ···········································

Integer projection

[3]:
df = pd.DataFrame({
    "value": [1, 2, 3],
    "year": [2019, 2020, 2021],
    "month": [10, 11, 12],
    "day": [25, 26, 27]
})

df
[3]:
value year month day
0 1 2019 10 25
1 2 2020 11 26
2 3 2021 12 27
[4]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_integer/",
    dataset=True,
    partition_cols=["year", "month", "day"],
    database="default",
    table="table_integer",
    projection_enabled=True,
    projection_types={
        "year": "integer",
        "month": "integer",
        "day": "integer"
    },
    projection_ranges={
        "year": "2000,2025",
        "month": "1,12",
        "day": "1,31"
    },
)
[5]:
wr.athena.read_sql_query(f"SELECT * FROM table_integer", database="default")
[5]:
value year month day
0 3 2021 12 27
1 2 2020 11 26
2 1 2019 10 25

Enum projection

[6]:
df = pd.DataFrame({
    "value": [1, 2, 3],
    "city": ["São Paulo", "Tokio", "Seattle"],
})

df
[6]:
value city
0 1 São Paulo
1 2 Tokio
2 3 Seattle
[7]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_enum/",
    dataset=True,
    partition_cols=["city"],
    database="default",
    table="table_enum",
    projection_enabled=True,
    projection_types={
        "city": "enum",
    },
    projection_values={
        "city": "São Paulo,Tokio,Seattle"
    },
)
[8]:
wr.athena.read_sql_query(f"SELECT * FROM table_enum", database="default")
[8]:
value city
0 1 São Paulo
1 3 Seattle
2 2 Tokio

Date projection

[9]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date()

df = pd.DataFrame({
    "value": [1, 2, 3],
    "dt": [dt("2020-01-01"), dt("2020-01-02"), dt("2020-01-03")],
    "ts": [ts("2020-01-01 00:00:00"), ts("2020-01-01 00:00:01"), ts("2020-01-01 00:00:02")],
})

df
[9]:
value dt ts
0 1 2020-01-01 2020-01-01 00:00:00
1 2 2020-01-02 2020-01-01 00:00:01
2 3 2020-01-03 2020-01-01 00:00:02
[10]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_date/",
    dataset=True,
    partition_cols=["dt", "ts"],
    database="default",
    table="table_date",
    projection_enabled=True,
    projection_types={
        "dt": "date",
        "ts": "date",
    },
    projection_ranges={
        "dt": "2020-01-01,2020-01-03",
        "ts": "2020-01-01 00:00:00,2020-01-01 00:00:02"
    },
)
[11]:
wr.athena.read_sql_query(f"SELECT * FROM table_date", database="default")
[11]:
value dt ts
0 1 2020-01-01 2020-01-01 00:00:00
1 2 2020-01-02 2020-01-01 00:00:01
2 3 2020-01-03 2020-01-01 00:00:02

Injected projection

[12]:
df = pd.DataFrame({
    "value": [1, 2, 3],
    "uuid": ["761e2488-a078-11ea-bb37-0242ac130002", "b89ed095-8179-4635-9537-88592c0f6bc3", "87adc586-ce88-4f0a-b1c8-bf8e00d32249"],
})

df
[12]:
value uuid
0 1 761e2488-a078-11ea-bb37-0242ac130002
1 2 b89ed095-8179-4635-9537-88592c0f6bc3
2 3 87adc586-ce88-4f0a-b1c8-bf8e00d32249
[13]:
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/table_injected/",
    dataset=True,
    partition_cols=["uuid"],
    database="default",
    table="table_injected",
    projection_enabled=True,
    projection_types={
        "uuid": "injected",
    }
)
[14]:
wr.athena.read_sql_query(
    sql=f"SELECT * FROM table_injected WHERE uuid='b89ed095-8179-4635-9537-88592c0f6bc3'",
    database="default"
)
[14]:
value uuid
0 2 b89ed095-8179-4635-9537-88592c0f6bc3

Cleaning Up

[15]:
wr.s3.delete_objects(f"s3://{bucket}/table_integer/")
wr.s3.delete_objects(f"s3://{bucket}/table_enum/")
wr.s3.delete_objects(f"s3://{bucket}/table_date/")
wr.s3.delete_objects(f"s3://{bucket}/table_injected/")
[16]:
wr.catalog.delete_table_if_exists(table="table_integer", database="default")
wr.catalog.delete_table_if_exists(table="table_enum", database="default")
wr.catalog.delete_table_if_exists(table="table_date", database="default")
wr.catalog.delete_table_if_exists(table="table_injected", database="default")
[ ]:

AWS SDK for pandas

18 - QuickSight

For this tutorial we will use the public AWS COVID-19 data lake.

References:

Please, install the CloudFormation template above to have access to the public data lake.

P.S. To be able to access the public data lake, you must allow explicitly QuickSight to access the related external bucket.

[1]:
import awswrangler as wr
from time import sleep

List users of QuickSight account

[2]:
[{"username": user["UserName"], "role": user["Role"]} for user in wr.quicksight.list_users('default')]
[2]:
[{'username': 'dev', 'role': 'ADMIN'}]
[3]:
wr.catalog.databases()
[3]:
Database Description
0 aws_sdk_pandas AWS SDK for pandas Test Arena - Glue Database
1 awswrangler_test
2 covid-19
3 default Default Hive database
[4]:
wr.catalog.tables(database="covid-19")
[4]:
Database Table Description Columns Partitions
0 covid-19 alleninstitute_comprehend_medical Comprehend Medical results run against Allen I... paper_id, date, dx_name, test_name, procedure_...
1 covid-19 alleninstitute_metadata Metadata on papers pulled from the Allen Insti... cord_uid, sha, source_x, title, doi, pmcid, pu...
2 covid-19 country_codes Lookup table for country codes country, alpha-2 code, alpha-3 code, numeric c...
3 covid-19 county_populations Lookup table for population for each county ba... id, id2, county, state, population estimate 2018
4 covid-19 covid_knowledge_graph_edges AWS Knowledge Graph for COVID-19 data id, label, from, to, score
5 covid-19 covid_knowledge_graph_nodes_author AWS Knowledge Graph for COVID-19 data id, label, first, last, full_name
6 covid-19 covid_knowledge_graph_nodes_concept AWS Knowledge Graph for COVID-19 data id, label, entity, concept
7 covid-19 covid_knowledge_graph_nodes_institution AWS Knowledge Graph for COVID-19 data id, label, institution, country, settlement
8 covid-19 covid_knowledge_graph_nodes_paper AWS Knowledge Graph for COVID-19 data id, label, doi, sha_code, publish_time, source...
9 covid-19 covid_knowledge_graph_nodes_topic AWS Knowledge Graph for COVID-19 data id, label, topic, topic_num
10 covid-19 covid_testing_states_daily USA total test daily trend by state. Sourced ... date, state, positive, negative, pending, hosp...
11 covid-19 covid_testing_us_daily USA total test daily trend. Sourced from covi... date, states, positive, negative, posneg, pend...
12 covid-19 covid_testing_us_total USA total tests. Sourced from covidtracking.c... positive, negative, posneg, hospitalized, deat...
13 covid-19 covidcast_data CMU Delphi's COVID-19 Surveillance Data data_source, signal, geo_type, time_value, geo...
14 covid-19 covidcast_metadata CMU Delphi's COVID-19 Surveillance Metadata data_source, signal, time_type, geo_type, min_...
15 covid-19 enigma_jhu Johns Hopkins University Consolidated data on ... fips, admin2, province_state, country_region, ...
16 covid-19 enigma_jhu_timeseries Johns Hopkins University data on COVID-19 case... uid, fips, iso2, iso3, code3, admin2, latitude...
17 covid-19 hospital_beds Data on hospital beds and their utilization in... objectid, hospital_name, hospital_type, hq_add...
18 covid-19 nytimes_counties Data on COVID-19 cases from NY Times at US cou... date, county, state, fips, cases, deaths
19 covid-19 nytimes_states Data on COVID-19 cases from NY Times at US sta... date, state, fips, cases, deaths
20 covid-19 prediction_models_county_predictions County-level Predictions Data. Sourced from Yu... countyfips, countyname, statename, severity_co...
21 covid-19 prediction_models_severity_index Severity Index models. Sourced from Yu Group a... severity_1-day, severity_2-day, severity_3-day...
22 covid-19 tableau_covid_datahub COVID-19 data that has been gathered and unifi... country_short_name, country_alpha_3_code, coun...
23 covid-19 tableau_jhu Johns Hopkins University data on COVID-19 case... case_type, cases, difference, date, country_re...
24 covid-19 us_state_abbreviations Lookup table for US state abbreviations state, abbreviation
25 covid-19 world_cases_deaths_testing Data on confirmed cases, deaths, and testing. ... iso_code, location, date, total_cases, new_cas...

Create data source of QuickSight Note: data source stores the connection information.

[5]:
wr.quicksight.create_athena_data_source(
    name="covid-19",
    workgroup="primary",
    allowed_to_manage=["dev"]
)
[6]:
wr.catalog.tables(database="covid-19", name_contains="nyt")
[6]:
Database Table Description Columns Partitions
0 covid-19 nytimes_counties Data on COVID-19 cases from NY Times at US cou... date, county, state, fips, cases, deaths
1 covid-19 nytimes_states Data on COVID-19 cases from NY Times at US sta... date, state, fips, cases, deaths
[7]:
wr.athena.read_sql_query("SELECT * FROM nytimes_counties limit 10", database="covid-19", ctas_approach=False)
[7]:
date county state fips cases deaths
0 2020-01-21 Snohomish Washington 53061 1 0
1 2020-01-22 Snohomish Washington 53061 1 0
2 2020-01-23 Snohomish Washington 53061 1 0
3 2020-01-24 Cook Illinois 17031 1 0
4 2020-01-24 Snohomish Washington 53061 1 0
5 2020-01-25 Orange California 06059 1 0
6 2020-01-25 Cook Illinois 17031 1 0
7 2020-01-25 Snohomish Washington 53061 1 0
8 2020-01-26 Maricopa Arizona 04013 1 0
9 2020-01-26 Los Angeles California 06037 1 0
[8]:
sql = """
SELECT
  j.*,
  co.Population,
  co.county AS county2,
  hb.*
FROM
  (
    SELECT
      date,
      county,
      state,
      fips,
      cases as confirmed,
      deaths
    FROM "covid-19".nytimes_counties
  ) j
  LEFT OUTER JOIN (
    SELECT
      DISTINCT county,
      state,
      "population estimate 2018" AS Population
    FROM
      "covid-19".county_populations
    WHERE
      state IN (
        SELECT
          DISTINCT state
        FROM
          "covid-19".nytimes_counties
      )
      AND county IN (
        SELECT
          DISTINCT county as county
        FROM "covid-19".nytimes_counties
      )
  ) co ON co.county = j.county
  AND co.state = j.state
  LEFT OUTER JOIN (
    SELECT
      count(objectid) as Hospital,
      fips as hospital_fips,
      sum(num_licensed_beds) as licensed_beds,
      sum(num_staffed_beds) as staffed_beds,
      sum(num_icu_beds) as icu_beds,
      avg(bed_utilization) as bed_utilization,
      sum(
        potential_increase_in_bed_capac
      ) as potential_increase_bed_capacity
    FROM "covid-19".hospital_beds
    WHERE
      fips in (
        SELECT
          DISTINCT fips
        FROM
          "covid-19".nytimes_counties
      )
    GROUP BY
      2
  ) hb ON hb.hospital_fips = j.fips
"""

wr.athena.read_sql_query(sql, database="covid-19", ctas_approach=False)
[8]:
date county state fips confirmed deaths population county2 Hospital hospital_fips licensed_beds staffed_beds icu_beds bed_utilization potential_increase_bed_capacity
0 2020-04-12 Park Montana 30067 7 0 16736 Park 0 30067 25 25 4 0.432548 0
1 2020-04-12 Ravalli Montana 30081 3 0 43172 Ravalli 0 30081 25 25 5 0.567781 0
2 2020-04-12 Silver Bow Montana 30093 11 0 34993 Silver Bow 0 30093 98 71 11 0.551457 27
3 2020-04-12 Clay Nebraska 31035 2 0 6214 Clay <NA> <NA> <NA> <NA> <NA> NaN <NA>
4 2020-04-12 Cuming Nebraska 31039 2 0 8940 Cuming 0 31039 25 25 4 0.204493 0
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
227684 2020-06-11 Hockley Texas 48219 28 1 22980 Hockley 0 48219 48 48 8 0.120605 0
227685 2020-06-11 Hudspeth Texas 48229 11 0 4795 Hudspeth <NA> <NA> <NA> <NA> <NA> NaN <NA>
227686 2020-06-11 Jones Texas 48253 633 0 19817 Jones 0 48253 45 7 1 0.718591 38
227687 2020-06-11 La Salle Texas 48283 4 0 7531 La Salle <NA> <NA> <NA> <NA> <NA> NaN <NA>
227688 2020-06-11 Limestone Texas 48293 36 1 23519 Limestone 0 48293 78 69 9 0.163940 9

227689 rows × 15 columns

Create Dataset with custom SQL option

[9]:
wr.quicksight.create_athena_dataset(
    name="covid19-nytimes-usa",
    sql=sql,
    sql_name='CustomSQL',
    data_source_name="covid-19",
    import_mode='SPICE',
    allowed_to_manage=["dev"]
)
[10]:
ingestion_id = wr.quicksight.create_ingestion("covid19-nytimes-usa")

Wait ingestion

[11]:
while wr.quicksight.describe_ingestion(ingestion_id=ingestion_id, dataset_name="covid19-nytimes-usa")["IngestionStatus"] not in ["COMPLETED", "FAILED"]:
    sleep(1)

Describe last ingestion

[12]:
wr.quicksight.describe_ingestion(ingestion_id=ingestion_id, dataset_name="covid19-nytimes-usa")["RowInfo"]
[12]:
{'RowsIngested': 227689, 'RowsDropped': 0}

List all ingestions

[13]:
[{"time": user["CreatedTime"], "source": user["RequestSource"]} for user in wr.quicksight.list_ingestions("covid19-nytimes-usa")]
[13]:
[{'time': datetime.datetime(2020, 6, 12, 15, 13, 46, 996000, tzinfo=tzlocal()),
  'source': 'MANUAL'},
 {'time': datetime.datetime(2020, 6, 12, 15, 13, 42, 344000, tzinfo=tzlocal()),
  'source': 'MANUAL'}]

Create new dataset from a table directly

[14]:
wr.quicksight.create_athena_dataset(
    name="covid-19-tableau_jhu",
    table="tableau_jhu",
    data_source_name="covid-19",
    database="covid-19",
    import_mode='DIRECT_QUERY',
    rename_columns={
        "cases": "Count_of_Cases",
        "combined_key": "County"
    },
    cast_columns_types={
        "Count_of_Cases": "INTEGER"
    },
    tag_columns={
        "combined_key": [{"ColumnGeographicRole": "COUNTY"}]
    },
    allowed_to_manage=["dev"]
)

Cleaning up

[15]:
wr.quicksight.delete_data_source("covid-19")
wr.quicksight.delete_dataset("covid19-nytimes-usa")
wr.quicksight.delete_dataset("covid-19-tableau_jhu")

AWS SDK for pandas

19 - Amazon Athena Cache

awswrangler has a cache strategy that is disabled by default and can be enabled by passing max_cache_seconds bigger than 0. This cache strategy for Amazon Athena can help you to decrease query times and costs.

When calling read_sql_query, instead of just running the query, we now can verify if the query has been run before. If so, and this last run was within max_cache_seconds (a new parameter to read_sql_query), we return the same results as last time if they are still available in S3. We have seen this increase performance more than 100x, but the potential is pretty much infinite.

The detailed approach is: - When read_sql_query is called with max_cache_seconds > 0 (it defaults to 0), we check for the last queries run by the same workgroup (the most we can get without pagination). - By default it will check the last 50 queries, but you can customize it through the max_cache_query_inspections argument. - We then sort those queries based on CompletionDateTime, descending - For each of those queries, we check if their CompletionDateTime is still within the max_cache_seconds window. If so, we check if the query string is the same as now (with some smart heuristics to guarantee coverage over both ctas_approaches). If they are the same, we check if the last one’s results are still on S3, and then return them instead of re-running the query. - During the whole cache resolution phase, if there is anything wrong, the logic falls back to the usual read_sql_query path.

P.S. The ``cache scope is bounded for the current workgroup``, so you will be able to reuse queries results from others colleagues running in the same environment.

[1]:
import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
 ···········································

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")
Creating a Parquet Table from the NOAA’s CSV files

Reference

[4]:
cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/by_year/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

df
[4]:
id dt element value m_flag q_flag s_flag obs_time
0 AGE00135039 1890-01-01 TMAX 160 NaN NaN E NaN
1 AGE00135039 1890-01-01 TMIN 30 NaN NaN E NaN
2 AGE00135039 1890-01-01 PRCP 45 NaN NaN E NaN
3 AGE00147705 1890-01-01 TMAX 140 NaN NaN E NaN
4 AGE00147705 1890-01-01 TMIN 74 NaN NaN E NaN
... ... ... ... ... ... ... ... ...
29240014 UZM00038457 1899-12-31 PRCP 16 NaN NaN r NaN
29240015 UZM00038457 1899-12-31 TAVG -73 NaN NaN r NaN
29240016 UZM00038618 1899-12-31 TMIN -76 NaN NaN r NaN
29240017 UZM00038618 1899-12-31 PRCP 0 NaN NaN r NaN
29240018 UZM00038618 1899-12-31 TAVG -60 NaN NaN r NaN

29240019 rows × 8 columns

[5]:
wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
)
[6]:
wr.catalog.table(database="awswrangler_test", table="noaa")
[6]:
Column Name Type Partition Comment
0 id string False
1 dt timestamp False
2 element string False
3 value bigint False
4 m_flag string False
5 q_flag string False
6 s_flag string False
7 obs_time string False

The test query

The more computational resources the query needs, the more the cache will help you. That’s why we’re doing it using this long running query.

[7]:
query = """
SELECT
    n1.element,
    count(1) as cnt
FROM
    noaa n1
JOIN
    noaa n2
ON
    n1.id = n2.id
GROUP BY
    n1.element
"""

First execution…

[8]:
%%time

wr.athena.read_sql_query(query, database="awswrangler_test")
CPU times: user 5.31 s, sys: 232 ms, total: 5.54 s
Wall time: 6min 42s
[8]:
element cnt
0 WDMV 49755046
1 SNWD 5089486328
2 DATN 10817510
3 DAPR 102579666
4 MDTN 10817510
5 WT03 71184687
6 WT09 584412
7 TOBS 146984266
8 DASF 7764526
9 WT04 9648963
10 WT18 92635444
11 WT01 87526136
12 WT16 323354156
13 PRCP 71238907298
14 SNOW 21950890838
15 WT06 307339
16 TAVG 2340863803
17 TMIN 41450979633
18 MDTX 11210687
19 WT07 4486872
20 WT10 137873
21 EVAP 970404
22 WT14 8073701
23 DATX 11210687
24 WT08 33933005
25 WT05 8211491
26 TMAX 39876132467
27 MDPR 114320989
28 WT11 22212890
29 DWPR 69005655
30 MDSF 12004843

Second execution with CACHE (400x faster)

[9]:
%%time

wr.athena.read_sql_query(query, database="awswrangler_test", max_cache_seconds=900)
CPU times: user 493 ms, sys: 34.9 ms, total: 528 ms
Wall time: 975 ms
[9]:
element cnt
0 WDMV 49755046
1 SNWD 5089486328
2 DATN 10817510
3 DAPR 102579666
4 MDTN 10817510
5 WT03 71184687
6 WT09 584412
7 TOBS 146984266
8 DASF 7764526
9 WT04 9648963
10 WT18 92635444
11 WT01 87526136
12 WT16 323354156
13 PRCP 71238907298
14 SNOW 21950890838
15 WT06 307339
16 TAVG 2340863803
17 TMIN 41450979633
18 MDTX 11210687
19 WT07 4486872
20 WT10 137873
21 EVAP 970404
22 WT14 8073701
23 DATX 11210687
24 WT08 33933005
25 WT05 8211491
26 TMAX 39876132467
27 MDPR 114320989
28 WT11 22212890
29 DWPR 69005655
30 MDSF 12004843

Allowing awswrangler to inspect up to 500 historical queries to find same result to reuse.

[10]:
%%time

wr.athena.read_sql_query(query, database="awswrangler_test", max_cache_seconds=900, max_cache_query_inspections=500)
CPU times: user 504 ms, sys: 44 ms, total: 548 ms
Wall time: 1.19 s
[10]:
element cnt
0 WDMV 49755046
1 SNWD 5089486328
2 DATN 10817510
3 DAPR 102579666
4 MDTN 10817510
5 WT03 71184687
6 WT09 584412
7 TOBS 146984266
8 DASF 7764526
9 WT04 9648963
10 WT18 92635444
11 WT01 87526136
12 WT16 323354156
13 PRCP 71238907298
14 SNOW 21950890838
15 WT06 307339
16 TAVG 2340863803
17 TMIN 41450979633
18 MDTX 11210687
19 WT07 4486872
20 WT10 137873
21 EVAP 970404
22 WT14 8073701
23 DATX 11210687
24 WT08 33933005
25 WT05 8211491
26 TMAX 39876132467
27 MDPR 114320989
28 WT11 22212890
29 DWPR 69005655
30 MDSF 12004843

Cleaning Up S3

[11]:
wr.s3.delete_objects(path)

Delete table

[12]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="noaa")
[12]:
True

Delete Database

[13]:
wr.catalog.delete_database('awswrangler_test')

AWS SDK for pandas

20 - Spark Table Interoperability

awswrangler has no difficulty to insert, overwrite or do any other kind of interaction with a Table created by Apache Spark.

But if you want to do the opposite (Spark interacting with a table created by awswrangler) you should be aware that awswrangler follows the Hive’s format and you must be explicit when using the Spark’s saveAsTable method:

[ ]:
spark_df.write.format("hive").saveAsTable("database.table")

Or just move forward using the insertInto alternative:

[ ]:
spark_df.write.insertInto("database.table")

AWS SDK for pandas

21 - Global Configurations

awswrangler has two ways to set global configurations that will override the regular default arguments configured in functions signatures.

  • Environment variables

  • wr.config

P.S. Check the function API doc to see if your function has some argument that can be configured through Global configurations.

P.P.S. One exception to the above mentioned rules is the ``botocore_config`` property. It cannot be set through environment variables but only via ``wr.config``. It will be used as the ``botocore.config.Config`` for all underlying ``boto3`` calls. The default config is ``botocore.config.Config(retries={“max_attempts”: 5}, connect_timeout=10, max_pool_connections=10)``. If you only want to change the retry behavior, you can use the environment variables ``AWS_MAX_ATTEMPTS`` and ``AWS_RETRY_MODE``. (see Boto3 documentation)

Environment Variables

[1]:
%env WR_DATABASE=default
%env WR_CTAS_APPROACH=False
%env WR_MAX_CACHE_SECONDS=900
%env WR_MAX_CACHE_QUERY_INSPECTIONS=500
%env WR_MAX_REMOTE_CACHE_ENTRIES=50
%env WR_MAX_LOCAL_CACHE_ENTRIES=100
env: WR_DATABASE=default
env: WR_CTAS_APPROACH=False
env: WR_MAX_CACHE_SECONDS=900
env: WR_MAX_CACHE_QUERY_INSPECTIONS=500
env: WR_MAX_REMOTE_CACHE_ENTRIES=50
env: WR_MAX_LOCAL_CACHE_ENTRIES=100
[1]:
import awswrangler as wr
import botocore
[3]:
wr.athena.read_sql_query("SELECT 1 AS FOO")
[3]:
foo
0 1

Resetting

[4]:
# Specific
wr.config.reset("database")
# All
wr.config.reset()

wr.config

[5]:
wr.config.database = "default"
wr.config.ctas_approach = False
wr.config.max_cache_seconds = 900
wr.config.max_cache_query_inspections = 500
wr.config.max_remote_cache_entries = 50
wr.config.max_local_cache_entries = 100
# Set botocore.config.Config that will be used for all boto3 calls
wr.config.botocore_config = botocore.config.Config(
    retries={"max_attempts": 10},
    connect_timeout=20,
    max_pool_connections=20
)
[6]:
wr.athena.read_sql_query("SELECT 1 AS FOO")
[6]:
foo
0 1

Visualizing

[2]:
wr.config
[2]:
name Env. Variable type nullable enforced configured value
0 catalog_id WR_CATALOG_ID <class 'str'> True False False None
1 concurrent_partitioning WR_CONCURRENT_PARTITIONING <class 'bool'> False False False None
2 ctas_approach WR_CTAS_APPROACH <class 'bool'> False False False None
3 database WR_DATABASE <class 'str'> True False False None
4 max_cache_query_inspections WR_MAX_CACHE_QUERY_INSPECTIONS <class 'int'> False False False None
5 max_cache_seconds WR_MAX_CACHE_SECONDS <class 'int'> False False False None
6 max_remote_cache_entries WR_MAX_REMOTE_CACHE_ENTRIES <class 'int'> False False False None
7 max_local_cache_entries WR_MAX_LOCAL_CACHE_ENTRIES <class 'int'> False False False None
8 s3_block_size WR_S3_BLOCK_SIZE <class 'int'> False True False None
9 workgroup WR_WORKGROUP <class 'str'> False True False None
10 chunksize WR_CHUNKSIZE <class 'int'> False True False None
11 s3_endpoint_url WR_S3_ENDPOINT_URL <class 'str'> True True True None
12 athena_endpoint_url WR_ATHENA_ENDPOINT_URL <class 'str'> True True True None
13 sts_endpoint_url WR_STS_ENDPOINT_URL <class 'str'> True True True None
14 glue_endpoint_url WR_GLUE_ENDPOINT_URL <class 'str'> True True True None
15 redshift_endpoint_url WR_REDSHIFT_ENDPOINT_URL <class 'str'> True True True None
16 kms_endpoint_url WR_KMS_ENDPOINT_URL <class 'str'> True True True None
17 emr_endpoint_url WR_EMR_ENDPOINT_URL <class 'str'> True True True None
18 lakeformation_endpoint_url WR_LAKEFORMATION_ENDPOINT_URL <class 'str'> True True True None
19 dynamodb_endpoint_url WR_DYNAMODB_ENDPOINT_URL <class 'str'> True True True None
20 secretsmanager_endpoint_url WR_SECRETSMANAGER_ENDPOINT_URL <class 'str'> True True True None
21 botocore_config WR_BOTOCORE_CONFIG <class 'botocore.config.Config'> True False True None
22 verify WR_VERIFY <class 'str'> True False True None

AWS SDK for pandas

22 - Writing Partitions Concurrently

  • concurrent_partitioning argument:

    If True will increase the parallelism level during the partitions writing. It will decrease the
    writing time and increase memory usage.
    

P.S. Check the function API doc to see it has some argument that can be configured through Global configurations.

[1]:
%reload_ext memory_profiler

import awswrangler as wr

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"
 ············

Reading 4 GB of CSV from NOAA’s historical data and creating a year column

[3]:
noaa_path = "s3://noaa-ghcn-pds/csv/by_year/193"

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
dates = ["dt", "obs_time"]
dtype = {x: "category" for x in ["element", "m_flag", "q_flag", "s_flag"]}

df = wr.s3.read_csv(noaa_path, names=cols, parse_dates=dates, dtype=dtype)

df["year"] = df["dt"].dt.year

print(f"Number of rows: {len(df.index)}")
print(f"Number of columns: {len(df.columns)}")
Number of rows: 125407761
Number of columns: 9

Default Writing

[4]:
%%time
%%memit

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["year"],
)
peak memory: 22169.04 MiB, increment: 11119.68 MiB
CPU times: user 49 s, sys: 12.5 s, total: 1min 1s
Wall time: 1min 11s

Concurrent Partitioning (Decreasing writing time, but increasing memory usage)

[5]:
%%time
%%memit

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["year"],
    concurrent_partitioning=True  # <-----
)
peak memory: 27819.48 MiB, increment: 15743.30 MiB
CPU times: user 52.3 s, sys: 13.6 s, total: 1min 5s
Wall time: 41.6 s

AWS SDK for pandas

23 - Flexible Partitions Filter (PUSH-DOWN)

  • partition_filter argument:

    - Callback Function filters to apply on PARTITION columns (PUSH-DOWN filter).
    - This function MUST receive a single argument (Dict[str, str]) where keys are partitions names and values are partitions values.
    - This function MUST return a bool, True to read the partition or False to ignore it.
    - Ignored if `dataset=False`.
    

P.S. Check the function API doc to see it has some argument that can be configured through Global configurations.

[1]:
import awswrangler as wr
import pandas as pd

Enter your bucket name:

[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Creating the Dataset (PARQUET)

[3]:
df = pd.DataFrame({
    "id": [1, 2, 3],
    "value": ["foo", "boo", "bar"],
})

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["value"]
)

wr.s3.read_parquet(path, dataset=True)
[3]:
id value
0 3 bar
1 2 boo
2 1 foo

Example 1

[4]:
my_filter = lambda x: x["value"].endswith("oo")

wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[4]:
id value
0 2 boo
1 1 foo

Example 2

[5]:
from Levenshtein import distance


def my_filter(partitions):
    return distance("boo", partitions["value"]) <= 1


wr.s3.read_parquet(path, dataset=True, partition_filter=my_filter)
[5]:
id value
0 2 boo
1 1 foo

Creating the Dataset (CSV)

[6]:
df = pd.DataFrame({
    "id": [1, 2, 3],
    "value": ["foo", "boo", "bar"],
})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    partition_cols=["value"],
    compression="gzip",
    index=False
)

wr.s3.read_csv(path, dataset=True)
[6]:
id value
0 3 bar
1 2 boo
2 1 foo

Example 1

[7]:
my_filter = lambda x: x["value"].endswith("oo")

wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[7]:
id value
0 2 boo
1 1 foo

Example 2

[8]:
from Levenshtein import distance


def my_filter(partitions):
    return distance("boo", partitions["value"]) <= 1


wr.s3.read_csv(path, dataset=True, partition_filter=my_filter)
[8]:
id value
0 2 boo
1 1 foo

AWS SDK for pandas

24 - Athena Query Metadata

For wr.athena.read_sql_query() and wr.athena.read_sql_table() the resulting DataFrame (or every DataFrame in the returned Iterator for chunked queries) have a query_metadata attribute, which brings the query result metadata returned by Boto3/Athena.

The expected query_metadata format is the same returned by:

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution

Environment Variables

[1]:
%env WR_DATABASE=default
env: WR_DATABASE=default
[2]:
import awswrangler as wr
[5]:
df = wr.athena.read_sql_query("SELECT 1 AS foo")

df
[5]:
foo
0 1

Getting statistics from query metadata

[6]:
print(f'DataScannedInBytes:            {df.query_metadata["Statistics"]["DataScannedInBytes"]}')
print(f'TotalExecutionTimeInMillis:    {df.query_metadata["Statistics"]["TotalExecutionTimeInMillis"]}')
print(f'QueryQueueTimeInMillis:        {df.query_metadata["Statistics"]["QueryQueueTimeInMillis"]}')
print(f'QueryPlanningTimeInMillis:     {df.query_metadata["Statistics"]["QueryPlanningTimeInMillis"]}')
print(f'ServiceProcessingTimeInMillis: {df.query_metadata["Statistics"]["ServiceProcessingTimeInMillis"]}')
DataScannedInBytes:            0
TotalExecutionTimeInMillis:    2311
QueryQueueTimeInMillis:        121
QueryPlanningTimeInMillis:     250
ServiceProcessingTimeInMillis: 37

AWS SDK for pandas

25 - Redshift - Loading Parquet files with Spectrum

Enter your bucket name:

[1]:
import getpass
bucket = getpass.getpass()
PATH = f"s3://{bucket}/files/"
 ···········································

Mocking some Parquet Files on S3

[2]:
import awswrangler as wr
import pandas as pd

df = pd.DataFrame({
    "col0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
    "col1": ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"],
})

df
[2]:
col0 col1
0 0 a
1 1 b
2 2 c
3 3 d
4 4 e
5 5 f
6 6 g
7 7 h
8 8 i
9 9 j
[3]:
wr.s3.to_parquet(df, PATH, max_rows_by_file=2, dataset=True, mode="overwrite")

Crawling the metadata and adding into Glue Catalog

[4]:
wr.s3.store_parquet_metadata(
    path=PATH,
    database="aws_sdk_pandas",
    table="test",
    dataset=True,
    mode="overwrite"
)
[4]:
({'col0': 'bigint', 'col1': 'string'}, None, None)

Running the CTAS query to load the data into Redshift storage

[5]:
con = wr.redshift.connect(connection="aws-sdk-pandas-redshift")
[6]:
query = "CREATE TABLE public.test AS (SELECT * FROM aws_sdk_pandas_external.test)"
[7]:
with con.cursor() as cursor:
    cursor.execute(query)

Running an INSERT INTO query to load MORE data into Redshift storage

[8]:
df = pd.DataFrame({
    "col0": [10, 11],
    "col1": ["k", "l"],
})
wr.s3.to_parquet(df, PATH, dataset=True, mode="overwrite")
[9]:
query = "INSERT INTO public.test (SELECT * FROM aws_sdk_pandas_external.test)"
[10]:
with con.cursor() as cursor:
    cursor.execute(query)

Checking the result

[11]:
query = "SELECT * FROM public.test"
[13]:
wr.redshift.read_sql_table(con=con, schema="public", table="test")
[13]:
col0 col1
0 5 f
1 1 b
2 3 d
3 6 g
4 8 i
5 10 k
6 4 e
7 0 a
8 2 c
9 7 h
10 9 j
11 11 l
[14]:
con.close()

AWS SDK for pandas

26 - Amazon Timestream

Creating resources

[10]:
import awswrangler as wr
import pandas as pd
from datetime import datetime

wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1)

Write

[11]:
df = pd.DataFrame(
    {
        "time": [datetime.now(), datetime.now(), datetime.now()],
        "dim0": ["foo", "boo", "bar"],
        "dim1": [1, 2, 3],
        "measure": [1.0, 1.1, 1.2],
    }
)

rejected_records = wr.timestream.write(
    df=df,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="measure",
    dimensions_cols=["dim0", "dim1"],
)

print(f"Number of rejected records: {len(rejected_records)}")
Number of rejected records: 0

Query

[12]:
wr.timestream.query(
    'SELECT time, measure_value::double, dim0, dim1 FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3'
)
[12]:
time measure_value::double dim0 dim1
0 2020-12-08 19:15:32.468 1.0 foo 1
1 2020-12-08 19:15:32.468 1.2 bar 3
2 2020-12-08 19:15:32.468 1.1 boo 2

Deleting resources

[13]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")

AWS SDK for pandas

27 - Amazon Timestream - Example 2

Reading test data

[1]:
import awswrangler as wr
import pandas as pd
from datetime import datetime

df = pd.read_csv(
    "https://raw.githubusercontent.com/aws/amazon-timestream-tools/master/sample_apps/data/sample.csv",
    names=[
        "ignore0",
        "region",
        "ignore1",
        "az",
        "ignore2",
        "hostname",
        "measure_kind",
        "measure",
        "ignore3",
        "ignore4",
        "ignore5",
    ],
    usecols=["region", "az", "hostname", "measure_kind", "measure"],
)
df["time"] = datetime.now()
df.reset_index(inplace=True, drop=False)

df
[1]:
index region az hostname measure_kind measure time
0 0 us-east-1 us-east-1a host-fj2hx cpu_utilization 21.394363 2020-12-08 16:18:47.599597
1 1 us-east-1 us-east-1a host-fj2hx memory_utilization 68.563420 2020-12-08 16:18:47.599597
2 2 us-east-1 us-east-1a host-6kMPE cpu_utilization 17.144579 2020-12-08 16:18:47.599597
3 3 us-east-1 us-east-1a host-6kMPE memory_utilization 73.507870 2020-12-08 16:18:47.599597
4 4 us-east-1 us-east-1a host-sxj7X cpu_utilization 26.584865 2020-12-08 16:18:47.599597
... ... ... ... ... ... ... ...
125995 125995 eu-north-1 eu-north-1c host-De8RB memory_utilization 68.063468 2020-12-08 16:18:47.599597
125996 125996 eu-north-1 eu-north-1c host-2z8tn memory_utilization 72.203680 2020-12-08 16:18:47.599597
125997 125997 eu-north-1 eu-north-1c host-2z8tn cpu_utilization 29.212219 2020-12-08 16:18:47.599597
125998 125998 eu-north-1 eu-north-1c host-9FczW memory_utilization 71.746134 2020-12-08 16:18:47.599597
125999 125999 eu-north-1 eu-north-1c host-9FczW cpu_utilization 1.677793 2020-12-08 16:18:47.599597

126000 rows × 7 columns

Creating resources

[2]:
wr.timestream.create_database("sampleDB")
wr.timestream.create_table("sampleDB", "sampleTable", memory_retention_hours=1, magnetic_retention_days=1)

Write CPU_UTILIZATION records

[3]:
df_cpu = df[df.measure_kind == "cpu_utilization"].copy()
df_cpu.rename(columns={"measure": "cpu_utilization"}, inplace=True)
df_cpu
[3]:
index region az hostname measure_kind cpu_utilization time
0 0 us-east-1 us-east-1a host-fj2hx cpu_utilization 21.394363 2020-12-08 16:18:47.599597
2 2 us-east-1 us-east-1a host-6kMPE cpu_utilization 17.144579 2020-12-08 16:18:47.599597
4 4 us-east-1 us-east-1a host-sxj7X cpu_utilization 26.584865 2020-12-08 16:18:47.599597
6 6 us-east-1 us-east-1a host-ExOui cpu_utilization 52.930970 2020-12-08 16:18:47.599597
8 8 us-east-1 us-east-1a host-Bwb3j cpu_utilization 99.134110 2020-12-08 16:18:47.599597
... ... ... ... ... ... ... ...
125990 125990 eu-north-1 eu-north-1c host-aPtc6 cpu_utilization 89.566125 2020-12-08 16:18:47.599597
125992 125992 eu-north-1 eu-north-1c host-7ZF9L cpu_utilization 75.510598 2020-12-08 16:18:47.599597
125994 125994 eu-north-1 eu-north-1c host-De8RB cpu_utilization 2.771261 2020-12-08 16:18:47.599597
125997 125997 eu-north-1 eu-north-1c host-2z8tn cpu_utilization 29.212219 2020-12-08 16:18:47.599597
125999 125999 eu-north-1 eu-north-1c host-9FczW cpu_utilization 1.677793 2020-12-08 16:18:47.599597

63000 rows × 7 columns

[4]:
rejected_records = wr.timestream.write(
    df=df_cpu,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="cpu_utilization",
    dimensions_cols=["index", "region", "az", "hostname"],
)

assert len(rejected_records) == 0

Write MEMORY_UTILIZATION records

[5]:
df_memory = df[df.measure_kind == "memory_utilization"].copy()
df_memory.rename(columns={"measure": "memory_utilization"}, inplace=True)

df_memory
[5]:
index region az hostname measure_kind memory_utilization time
1 1 us-east-1 us-east-1a host-fj2hx memory_utilization 68.563420 2020-12-08 16:18:47.599597
3 3 us-east-1 us-east-1a host-6kMPE memory_utilization 73.507870 2020-12-08 16:18:47.599597
5 5 us-east-1 us-east-1a host-sxj7X memory_utilization 22.401424 2020-12-08 16:18:47.599597
7 7 us-east-1 us-east-1a host-ExOui memory_utilization 45.440135 2020-12-08 16:18:47.599597
9 9 us-east-1 us-east-1a host-Bwb3j memory_utilization 15.042701 2020-12-08 16:18:47.599597
... ... ... ... ... ... ... ...
125991 125991 eu-north-1 eu-north-1c host-aPtc6 memory_utilization 75.686739 2020-12-08 16:18:47.599597
125993 125993 eu-north-1 eu-north-1c host-7ZF9L memory_utilization 18.386152 2020-12-08 16:18:47.599597
125995 125995 eu-north-1 eu-north-1c host-De8RB memory_utilization 68.063468 2020-12-08 16:18:47.599597
125996 125996 eu-north-1 eu-north-1c host-2z8tn memory_utilization 72.203680 2020-12-08 16:18:47.599597
125998 125998 eu-north-1 eu-north-1c host-9FczW memory_utilization 71.746134 2020-12-08 16:18:47.599597

63000 rows × 7 columns

[6]:
rejected_records = wr.timestream.write(
    df=df_memory,
    database="sampleDB",
    table="sampleTable",
    time_col="time",
    measure_col="memory_utilization",
    dimensions_cols=["index", "region", "az", "hostname"],
)

assert len(rejected_records) == 0

Querying CPU_UTILIZATION

[7]:
wr.timestream.query("""
    SELECT
        hostname, region, az, measure_name, measure_value::double, time
    FROM "sampleDB"."sampleTable"
    WHERE measure_name = 'cpu_utilization'
    ORDER BY time DESC
    LIMIT 10
""")
[7]:
hostname region az measure_name measure_value::double time
0 host-OgvFx us-west-1 us-west-1a cpu_utilization 39.617911 2020-12-08 19:18:47.600
1 host-rZUNx eu-north-1 eu-north-1a cpu_utilization 30.793332 2020-12-08 19:18:47.600
2 host-t1kAB us-east-2 us-east-2b cpu_utilization 74.453239 2020-12-08 19:18:47.600
3 host-RdQRf us-east-1 us-east-1c cpu_utilization 76.984448 2020-12-08 19:18:47.600
4 host-4Llhu us-east-1 us-east-1c cpu_utilization 41.862733 2020-12-08 19:18:47.600
5 host-2plqa us-west-1 us-west-1a cpu_utilization 34.864762 2020-12-08 19:18:47.600
6 host-J3Q4z us-east-1 us-east-1b cpu_utilization 71.574266 2020-12-08 19:18:47.600
7 host-VIR5T ap-east-1 ap-east-1a cpu_utilization 14.017491 2020-12-08 19:18:47.600
8 host-G042D us-east-1 us-east-1c cpu_utilization 60.199068 2020-12-08 19:18:47.600
9 host-8EBHm us-west-2 us-west-2c cpu_utilization 96.631624 2020-12-08 19:18:47.600

Querying MEMORY_UTILIZATION

[8]:
wr.timestream.query("""
    SELECT
        hostname, region, az, measure_name, measure_value::double, time
    FROM "sampleDB"."sampleTable"
    WHERE measure_name = 'memory_utilization'
    ORDER BY time DESC
    LIMIT 10
""")
[8]:
hostname region az measure_name measure_value::double time
0 host-7c897 us-west-2 us-west-2b memory_utilization 63.427726 2020-12-08 19:18:47.600
1 host-2z8tn eu-north-1 eu-north-1c memory_utilization 41.071368 2020-12-08 19:18:47.600
2 host-J3Q4z us-east-1 us-east-1b memory_utilization 23.944388 2020-12-08 19:18:47.600
3 host-mjrQb us-east-1 us-east-1b memory_utilization 69.173431 2020-12-08 19:18:47.600
4 host-AyWSI us-east-1 us-east-1c memory_utilization 75.591467 2020-12-08 19:18:47.600
5 host-Axf0g us-west-2 us-west-2a memory_utilization 29.720739 2020-12-08 19:18:47.600
6 host-ilMBa us-east-2 us-east-2b memory_utilization 71.544134 2020-12-08 19:18:47.600
7 host-CWdXX us-west-2 us-west-2c memory_utilization 79.792799 2020-12-08 19:18:47.600
8 host-8EBHm us-west-2 us-west-2c memory_utilization 66.082554 2020-12-08 19:18:47.600
9 host-dRIJj us-east-1 us-east-1c memory_utilization 86.748960 2020-12-08 19:18:47.600

Deleting resources

[9]:
wr.timestream.delete_table("sampleDB", "sampleTable")
wr.timestream.delete_database("sampleDB")

AWS SDK for pandas

28 - Amazon DynamoDB

Writing Data

[23]:
from datetime import datetime
from decimal import Decimal
from pathlib import Path

import awswrangler as wr
import pandas as pd
from boto3.dynamodb.conditions import Attr, Key
Writing DataFrame
[27]:
table_name = "movies"

df = pd.DataFrame({
    "title": ["Titanic", "Snatch", "The Godfather"],
    "year": [1997, 2000, 1972],
    "genre": ["drama", "caper story", "crime"],
})
wr.dynamodb.put_df(df=df, table_name=table_name)
Writing CSV file
[3]:
filepath = Path("items.csv")
df.to_csv(filepath, index=False)
wr.dynamodb.put_csv(path=filepath, table_name=table_name)
filepath.unlink()
Writing JSON files
[4]:
filepath = Path("items.json")
df.to_json(filepath, orient="records")
wr.dynamodb.put_json(path="items.json", table_name=table_name)
filepath.unlink()
Writing list of items
[5]:
items = df.to_dict(orient="records")
wr.dynamodb.put_items(items=items, table_name=table_name)

Reading Data

Read PartiQL
[ ]:
# PartiQL Query
wr.dynamodb.read_partiql_query(
    query=f"SELECT * FROM {table_name} WHERE title=? AND year=?",
    parameters=["Snatch", 2000],
)
Read Items
[ ]:
# Limit Read to 5 items
wr.dynamodb.read_items(table_name=table_name, max_items_evaluated=5)

# Limit Read to Key expression
wr.dynamodb.read_items(
    table_name=table_name,
    key_condition_expression=(Key("title").eq("Snatch") & Key("year").eq(2000))
)

Executing statements

[29]:
title = "The Lord of the Rings: The Fellowship of the Ring"
year = datetime.now().year
genre = "epic"
rating = Decimal('9.9')
plot = "The fate of Middle-earth hangs in the balance as Frodo and eight companions begin their journey to Mount Doom in the land of Mordor."

# Insert items
wr.dynamodb.execute_statement(
    statement=f"INSERT INTO {table_name} VALUE {{'title': ?, 'year': ?, 'genre': ?, 'info': ?}}",
    parameters=[title, year, genre, {"plot": plot, "rating": rating}],
)

# Select items
wr.dynamodb.execute_statement(
    statement=f"SELECT * FROM \"{table_name}\" WHERE title=? AND year=?",
    parameters=[title, year],
)

# Update items
wr.dynamodb.execute_statement(
    statement=f"UPDATE \"{table_name}\" SET info.rating=? WHERE title=? AND year=?",
    parameters=[Decimal(10), title, year],
)

# Delete items
wr.dynamodb.execute_statement(
    statement=f"DELETE FROM \"{table_name}\" WHERE title=? AND year=?",
    parameters=[title, year],
)
[29]:
[]

Deleting items

[6]:
wr.dynamodb.delete_items(items=items, table_name="table")

AWS SDK for pandas

29 - S3 Select

AWS SDK for pandas supports Amazon S3 Select, enabling applications to use SQL statements in order to query and filter the contents of a single S3 object. It works on objects stored in CSV, JSON or Apache Parquet, including compressed and large files of several TBs.

With S3 Select, the query workload is delegated to Amazon S3, leading to lower latency and cost, and to higher performance (up to 400% improvement). This is in comparison with other awswrangler operations such as read_parquet where the S3 object is downloaded and filtered on the client-side.

This feature has a number of limitations however, and should be used for specific scenarios only:

  • It operates on a single S3 object

  • The maximum length of a record in the input or result is 1 MB

  • The maximum uncompressed row group size is 256 MB (Parquet only)

  • It can only emit nested data in JSON format

  • Certain SQL operations are not supported (e.g. ORDER BY)

Read full CSV file

[2]:
import awswrangler as wr

df = wr.s3.select_query(
    sql="SELECT * FROM s3object",
    path="s3://nyc-tlc/csv_backup/fhv_tripdata_2019-09.csv",  # 58 MB
    input_serialization="CSV",
    input_serialization_params={
        "FileHeaderInfo": "Use",
        "RecordDelimiter": "\r\n",
    },
    use_threads=True,
)
df.head()
[2]:
dispatching_base_num pickup_datetime dropoff_datetime PULocationID DOLocationID SR_Flag
0 B00009 2019-09-01 00:35:00 2019-09-01 00:59:00 264 264
1 B00009 2019-09-01 00:48:00 2019-09-01 01:09:00 264 264
2 B00014 2019-09-01 00:16:18 2019-09-02 00:35:37 264 264
3 B00014 2019-09-01 00:55:03 2019-09-01 01:09:35 264 264
4 B00014 2019-09-01 00:13:08 2019-09-02 01:12:31 264 264

Filter JSON file

[2]:
wr.s3.select_query(
    sql="SELECT * FROM s3object[*] s where s.\"family_name\" = \'Biden\'",
    path="s3://awsglue-datasets/examples/us-legislators/all/persons.json",
    input_serialization="JSON",
    input_serialization_params={
        "Type": "Document",
    },
)
[2]:
family_name contact_details name links gender image identifiers other_names sort_name images given_name birth_date id
0 Biden [{'type': 'twitter', 'value': 'joebiden'}] Joseph Biden, Jr. [{'note': 'Wikipedia (ace)', 'url': 'https://a... male https://theunitedstates.io/images/congress/ori... [{'scheme': 'bioguide', 'identifier': 'B000444... [{'note': 'alternate', 'name': 'Joe Biden'}, {... Biden, Joseph [{'url': 'https://theunitedstates.io/images/co... Joseph 1942-11-20 64239edf-8e06-4d2d-acc0-33d96bc79774

Read Snappy compressed Parquet

[3]:
df = wr.s3.select_query(
        sql="SELECT * FROM s3object s where s.\"star_rating\" >= 5",
        path="s3://amazon-reviews-pds/parquet/product_category=Gift_Card/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
        input_serialization="Parquet",
        input_serialization_params={},
        use_threads=True,
)
df.loc[:, df.columns != "product_title"].head()
[3]:
marketplace customer_id review_id product_id product_parent star_rating helpful_votes total_votes vine verified_purchase review_headline review_body review_date year
0 US 52670295 RGPOFKORD8RTU B0002CZPPG 867256265 5 105 107 N N Excellent Gift Idea I wonder if the other reviewer actually read t... 2005-02-08 2005
1 US 29964102 R2U8X8V5KPB4J3 B00H5BMF00 373287760 5 0 0 N Y Five Stars convenience is the name of the game. 2015-05-03 2015
2 US 25173351 R15XV3LXUMLTXL B00PG40CO4 137115061 5 0 0 N Y Birthday Gift This gift card was handled with accuracy in de... 2015-05-03 2015
3 US 12516181 R3G6G7H8TX4H0T B0002CZPPG 867256265 5 6 6 N N Love 'em. Gotta love these iTunes Prepaid Card thingys. ... 2005-10-15 2005
4 US 38355314 R2NJ7WNBU16YTQ B00B2TFSO6 89375983 5 0 0 N Y Five Stars perfect 2015-05-03 2015

AWS SDK for pandas

30 - Data Api

The Data Api simplifies access to Amazon Redshift and RDS by removing the need to manage database connections and credentials. Instead, you can execute SQL commands to an Amazon Redshift cluster or Amazon Aurora cluster by simply invoking an HTTPS API endpoint provided by the Data API. It takes care of managing database connections and returning data. Since the Data API leverages IAM user credentials or database credentials stored in AWS Secrets Manager, you don’t need to pass credentials in API calls.

Connect to the cluster

[ ]:
con_redshift = wr.data_api.redshift.connect(
    cluster_id="aws-sdk-pandas-1xn5lqxrdxrv3",
    database="test_redshift",
    secret_arn="arn:aws:secretsmanager:us-east-1:111111111111:secret:aws-sdk-pandas/redshift-ewn43d"
)

con_redshift_serverless = wr.data_api.redshift.connect(
    workgroup_name="aws-sdk-pandas",
    database="test_redshift",
    secret_arn="arn:aws:secretsmanager:us-east-1:111111111111:secret:aws-sdk-pandas/redshift-f3en4w"
)

con_mysql = wr.data_api.rds.connect(
    resource_arn="arn:aws:rds:us-east-1:111111111111:cluster:mysql-serverless-cluster-wrangler",
    database="test_rds",
    secret_arn="arn:aws:secretsmanager:us-east-1:111111111111:secret:aws-sdk-pandas/mysql-23df3"
)

Read from database

[ ]:
df = wr.data_api.redshift.read_sql_query(
    sql="SELECT * FROM public.test_table",
    con=con_redshift,
)

df = wr.data_api.rds.read_sql_query(
    sql="SELECT * FROM test.test_table",
    con=con_rds,
)

AWS SDK for pandas

31 - OpenSearch

Table of Contents

1. Initialize

[1]:
import awswrangler as wr
Connect to your Amazon OpenSearch domain
[2]:
client = wr.opensearch.connect(
    host='OPENSEARCH-ENDPOINT',
#     username='FGAC-USERNAME(OPTIONAL)',
#     password='FGAC-PASSWORD(OPTIONAL)'
)
client.info()
Enter your bucket name
[3]:
bucket = 'BUCKET'
Initialize sample data
[4]:
sf_restaurants_inspections = [
    {
        "inspection_id": "24936_20160609",
        "business_address": "315 California St",
        "business_city": "San Francisco",
        "business_id": "24936",
        "business_location": {"lon": -122.400152, "lat": 37.793199},
        "business_name": "San Francisco Soup Company",
        "business_postal_code": "94104",
        "business_state": "CA",
        "inspection_date": "2016-06-09T00:00:00.000",
        "inspection_score": 77,
        "inspection_type": "Routine - Unscheduled",
        "risk_category": "Low Risk",
        "violation_description": "Improper food labeling or menu misrepresentation",
        "violation_id": "24936_20160609_103141",
    },
    {
        "inspection_id": "60354_20161123",
        "business_address": "10 Mason St",
        "business_city": "San Francisco",
        "business_id": "60354",
        "business_location": {"lon": -122.409061, "lat": 37.783527},
        "business_name": "Soup Unlimited",
        "business_postal_code": "94102",
        "business_state": "CA",
        "inspection_date": "2016-11-23T00:00:00.000",
        "inspection_type": "Routine",
        "inspection_score": 95,
    },
    {
        "inspection_id": "1797_20160705",
        "business_address": "2872 24th St",
        "business_city": "San Francisco",
        "business_id": "1797",
        "business_location": {"lon": -122.409752, "lat": 37.752807},
        "business_name": "TIO CHILOS GRILL",
        "business_postal_code": "94110",
        "business_state": "CA",
        "inspection_date": "2016-07-05T00:00:00.000",
        "inspection_score": 90,
        "inspection_type": "Routine - Unscheduled",
        "risk_category": "Low Risk",
        "violation_description": "Unclean nonfood contact surfaces",
        "violation_id": "1797_20160705_103142",
    },
    {
        "inspection_id": "66198_20160527",
        "business_address": "1661 Tennessee St Suite 3B",
        "business_city": "San Francisco Whard Restaurant",
        "business_id": "66198",
        "business_location": {"lon": -122.388478, "lat": 37.75072},
        "business_name": "San Francisco Restaurant",
        "business_postal_code": "94107",
        "business_state": "CA",
        "inspection_date": "2016-05-27T00:00:00.000",
        "inspection_type": "Routine",
        "inspection_score": 56,
    },
    {
        "inspection_id": "5794_20160907",
        "business_address": "2162 24th Ave",
        "business_city": "San Francisco",
        "business_id": "5794",
        "business_location": {"lon": -122.481299, "lat": 37.747228},
        "business_name": "Soup House",
        "business_phone_number": "+14155752700",
        "business_postal_code": "94116",
        "business_state": "CA",
        "inspection_date": "2016-09-07T00:00:00.000",
        "inspection_score": 96,
        "inspection_type": "Routine - Unscheduled",
        "risk_category": "Low Risk",
        "violation_description": "Unapproved or unmaintained equipment or utensils",
        "violation_id": "5794_20160907_103144",
    },

    # duplicate record
    {
        "inspection_id": "5794_20160907",
        "business_address": "2162 24th Ave",
        "business_city": "San Francisco",
        "business_id": "5794",
        "business_location": {"lon": -122.481299, "lat": 37.747228},
        "business_name": "Soup-or-Salad",
        "business_phone_number": "+14155752700",
        "business_postal_code": "94116",
        "business_state": "CA",
        "inspection_date": "2016-09-07T00:00:00.000",
        "inspection_score": 96,
        "inspection_type": "Routine - Unscheduled",
        "risk_category": "Low Risk",
        "violation_description": "Unapproved or unmaintained equipment or utensils",
        "violation_id": "5794_20160907_103144",
    },
]

2. Indexing (load)

Index documents (no Pandas)
[5]:
# index documents w/o providing keys (_id is auto-generated)
wr.opensearch.index_documents(
        client,
        documents=sf_restaurants_inspections,
        index="sf_restaurants_inspections"
)
Indexing: 100% (6/6)|####################################|Elapsed Time: 0:00:01
[5]:
{'success': 6, 'errors': []}
[6]:
# read all documents. There are total 6 documents
wr.opensearch.search(
        client,
        index="sf_restaurants_inspections",
        _source=["inspection_id", "business_name", "business_location"]
)
[6]:
_id business_name inspection_id business_location.lon business_location.lat
0 663dd72d-0da4-495b-b0ae-ed000105ae73 TIO CHILOS GRILL 1797_20160705 -122.409752 37.752807
1 ff2f50f6-5415-4706-9bcb-af7c5eb0afa3 Soup House 5794_20160907 -122.481299 37.747228
2 b9e8f6a2-8fd1-4660-b041-2997a1a80984 San Francisco Soup Company 24936_20160609 -122.400152 37.793199
3 56b352e6-102b-4eff-8296-7e1fb2459bab Soup Unlimited 60354_20161123 -122.409061 37.783527
4 6fec5411-f79a-48e4-be7b-e0e44d5ebbab San Francisco Restaurant 66198_20160527 -122.388478 37.750720
5 7ba4fb17-f9a9-49da-b90e-8b3553d6d97c Soup-or-Salad 5794_20160907 -122.481299 37.747228
Index json file
[ ]:
import pandas as pd
df = pd.DataFrame(sf_restaurants_inspections)
path = f"s3://{bucket}/json/sf_restaurants_inspections.json"
wr.s3.to_json(df, path,orient='records',lines=True)
[8]:
# index json w/ providing keys
wr.opensearch.index_json(
        client,
        path=path, # path can be s3 or local
        index="sf_restaurants_inspections_dedup",
        id_keys=["inspection_id"] # can be multiple fields. arg applicable to all index_* functions
)
Indexing: 100% (6/6)|####################################|Elapsed Time: 0:00:00
[8]:
{'success': 6, 'errors': []}
[9]:
# now there are no duplicates. There are total 5 documents
wr.opensearch.search(
        client,
        index="sf_restaurants_inspections_dedup",
        _source=["inspection_id", "business_name", "business_location"]
    )
[9]:
_id business_name inspection_id business_location.lon business_location.lat
0 24936_20160609 San Francisco Soup Company 24936_20160609 -122.400152 37.793199
1 66198_20160527 San Francisco Restaurant 66198_20160527 -122.388478 37.750720
2 5794_20160907 Soup-or-Salad 5794_20160907 -122.481299 37.747228
3 60354_20161123 Soup Unlimited 60354_20161123 -122.409061 37.783527
4 1797_20160705 TIO CHILOS GRILL 1797_20160705 -122.409752 37.752807
Index CSV
[11]:
wr.opensearch.index_csv(
        client,
        index="nyc_restaurants_inspections_sample",
        path='https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD', # index_csv supports local, s3 and url path
        id_keys=["CAMIS"],
        pandas_kwargs={'na_filter': True, 'nrows': 1000},  # pandas.read_csv() args - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
        bulk_size=500 # modify based on your cluster size
)
Indexing: 100% (1000/1000)|##############################|Elapsed Time: 0:00:00
[11]:
{'success': 1000, 'errors': []}
[12]:
wr.opensearch.search(
        client,
        index="nyc_restaurants_inspections_sample",
        size=5
)
[12]:
_id CAMIS DBA BORO BUILDING STREET ZIPCODE PHONE CUISINE DESCRIPTION INSPECTION DATE ... RECORD DATE INSPECTION TYPE Latitude Longitude Community Board Council District Census Tract BIN BBL NTA
0 41610426 41610426 GLOW THAI RESTAURANT Brooklyn 7107 3 AVENUE 11209.0 7187481920 Thai 02/26/2020 ... 10/04/2021 Cycle Inspection / Re-inspection 40.633865 -74.026798 310.0 43.0 6800.0 3146519.0 3.058910e+09 BK31
1 40811162 40811162 CARMINE'S Manhattan 2450 BROADWAY 10024.0 2123622200 Italian 05/28/2019 ... 10/04/2021 Cycle Inspection / Initial Inspection 40.791168 -73.974308 107.0 6.0 17900.0 1033560.0 1.012380e+09 MN12
2 50012113 50012113 TANG Queens 196-50 NORTHERN BOULEVARD 11358.0 7182797080 Korean 08/16/2018 ... 10/04/2021 Cycle Inspection / Initial Inspection 40.757850 -73.784593 411.0 19.0 145101.0 4124565.0 4.055200e+09 QN48
3 50014618 50014618 TOTTO RAMEN Manhattan 248 EAST 52 STREET 10022.0 2124210052 Japanese 08/20/2018 ... 10/04/2021 Cycle Inspection / Re-inspection 40.756596 -73.968749 106.0 4.0 9800.0 1038490.0 1.013250e+09 MN19
4 50045782 50045782 OLLIE'S CHINESE RESTAURANT Manhattan 2705 BROADWAY 10025.0 2129323300 Chinese 10/21/2019 ... 10/04/2021 Cycle Inspection / Re-inspection 40.799318 -73.968440 107.0 6.0 19100.0 1056562.0 1.018750e+09 MN12

5 rows × 27 columns

4. Delete Indices

[15]:
wr.opensearch.delete_index(
     client=client,
     index="sf_restaurants_inspections"
)
[15]:
{'acknowledged': True}

5. Bonus - Prepare data and index from DataFrame

For this exercise we’ll use DOHMH New York City Restaurant Inspection Results dataset

[16]:
import pandas as pd
[17]:
df = pd.read_csv('https://data.cityofnewyork.us/api/views/43nn-pn8j/rows.csv?accessType=DOWNLOAD')
Prepare the data for indexing
[18]:
# fields names underscore casing
df.columns = [col.lower().replace(' ', '_') for col in df.columns]

# convert lon/lat to OpenSearch geo_point
df['business_location'] = "POINT (" + df.longitude.fillna('0').astype(str) + " " + df.latitude.fillna('0').astype(str) + ")"
Create index with mapping
[19]:
# delete index if exists
wr.opensearch.delete_index(
    client=client,
    index="nyc_restaurants"

)

# use dynamic_template to map date fields
# define business_location as geo_point
wr.opensearch.create_index(
    client=client,
    index="nyc_restaurants_inspections",
    mappings={
         "dynamic_templates" : [
            {
                "dates" : {
                   "match" : "*date",
                    "mapping" : {
                        "type" : "date",
                        "format" : 'MM/dd/yyyy'
                    }
                }
            }
        ],
         "properties": {
          "business_location": {
            "type": "geo_point"
          }
        }
    }
)
[19]:
{'acknowledged': True,
 'shards_acknowledged': True,
 'index': 'nyc_restaurants_inspections'}
Index dataframe
[20]:
wr.opensearch.index_df(
    client,
    df=df,
    index="nyc_restaurants_inspections",
    id_keys=["camis"],
    bulk_size=1000
)
Indexing: 100% (382655/382655)|##########################|Elapsed Time: 0:04:15
[20]:
{'success': 382655, 'errors': []}
Execute geo query
Sort restaurants by distance from Times-Square
[21]:
wr.opensearch.search(
    client,
    index="nyc_restaurants_inspections",
    filter_path=["hits.hits._source"],
    size=100,
    search_body={
        "query": {
            "match_all": {}
        },
          "sort": [
            {
              "_geo_distance": {
                "business_location": { # Times-Square - https://geojson.io/#map=16/40.7563/-73.9862
                  "lat":  40.75613228383523,
                  "lon": -73.9865791797638
                },
                "order": "asc"
              }
            }
        ]
    }
)
[21]:
camis dba boro building street zipcode phone cuisine_description inspection_date action ... inspection_type latitude longitude community_board council_district census_tract bin bbl nta business_location
0 41551304 THE COUNTER Manhattan 7 TIMES SQUARE 10036.0 2129976801 American 12/22/2016 Violations were cited in the following area(s). ... Cycle Inspection / Initial Inspection 40.755908 -73.986681 105.0 3.0 11300.0 1086069.0 1.009940e+09 MN17 POINT (-73.986680953809 40.755907817312)
1 50055665 ANN INC CAFE Manhattan 7 TIMES SQUARE 10036.0 2125413287 American 12/11/2019 Violations were cited in the following area(s). ... Cycle Inspection / Initial Inspection 40.755908 -73.986681 105.0 3.0 11300.0 1086069.0 1.009940e+09 MN17 POINT (-73.986680953809 40.755907817312)
2 50049552 ERNST AND YOUNG Manhattan 5 TIMES SQ 10036.0 2127739994 Coffee/Tea 11/30/2018 Violations were cited in the following area(s). ... Cycle Inspection / Initial Inspection 40.755702 -73.987208 105.0 3.0 11300.0 1024656.0 1.010130e+09 MN17 POINT (-73.987207980138 40.755702020307)
3 50014078 RED LOBSTER Manhattan 5 TIMES SQ 10036.0 2127306706 Seafood 10/03/2017 Violations were cited in the following area(s). ... Cycle Inspection / Initial Inspection 40.755702 -73.987208 105.0 3.0 11300.0 1024656.0 1.010130e+09 MN17 POINT (-73.987207980138 40.755702020307)
4 50015171 NEW AMSTERDAM THEATER Manhattan 214 WEST 42 STREET 10036.0 2125825472 American 06/26/2018 Violations were cited in the following area(s). ... Cycle Inspection / Re-inspection 40.756317 -73.987652 105.0 3.0 11300.0 1024660.0 1.010130e+09 MN17 POINT (-73.987651832547 40.756316895053)
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
95 41552060 PROSKAUER ROSE Manhattan 11 TIMES SQUARE 10036.0 2129695493 American 08/11/2017 Violations were cited in the following area(s). ... Administrative Miscellaneous / Initial Inspection 40.756891 -73.990023 105.0 3.0 11300.0 1087978.0 1.010138e+09 MN17 POINT (-73.990023200823 40.756890780426)
96 41242148 GABBY O'HARA'S Manhattan 123 WEST 39 STREET 10018.0 2122788984 Irish 07/30/2019 Violations were cited in the following area(s). ... Cycle Inspection / Re-inspection 40.753405 -73.986602 105.0 4.0 11300.0 1080611.0 1.008150e+09 MN17 POINT (-73.986602050292 40.753404587174)
97 50095860 THE TIMES EATERY Manhattan 680 8 AVENUE 10036.0 6463867787 American 02/28/2020 Violations were cited in the following area(s). ... Pre-permit (Operational) / Initial Inspection 40.757991 -73.989218 105.0 3.0 11900.0 1024703.0 1.010150e+09 MN17 POINT (-73.989218092096 40.757991356019)
98 50072861 ITSU Manhattan 530 7 AVENUE 10018.0 9176393645 Asian/Asian Fusion 09/10/2018 Violations were cited in the following area(s). ... Pre-permit (Operational) / Initial Inspection 40.753844 -73.988551 105.0 3.0 11300.0 1014485.0 1.007880e+09 MN17 POINT (-73.988551029682 40.753843959794)
99 50068109 LUKE'S LOBSTER Manhattan 1407 BROADWAY 10018.0 9174759192 Seafood 09/06/2017 Violations were cited in the following area(s). ... Pre-permit (Operational) / Initial Inspection 40.753432 -73.987151 105.0 3.0 11300.0 1015265.0 1.008140e+09 MN17 POINT (-73.98715066791 40.753432097521)

100 rows × 27 columns

AWS SDK for pandas

32 - AWS Lake Formation - Glue Governed tables

This tutorial assumes that your IAM user/role has the required Lake Formation permissions to create and read AWS Glue Governed tables

Table of Contents
1. Read Governed table
1.1 Read PartiQL query
[ ]:
import awswrangler as wr

database = "gov_db"  # Assumes a Glue database registered with Lake Formation exists in the account
table = "gov_table"  # Assumes a Governed table exists in the account
catalog_id = "111111111111"  # AWS Account Id

# Note 1: If a transaction_id is not specified, a new transaction is started
df = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {table};",
    database=database,
    catalog_id=catalog_id
)

1.1.1 Read within transaction

[ ]:
transaction_id = wr.lakeformation.start_transaction(read_only=True)
df = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {table};",
    database=database,
    transaction_id=transaction_id
)

1.1.2 Read within query as of time

[ ]:
import calendar
import time

query_as_of_time = query_as_of_time = calendar.timegm(time.gmtime())
df = wr.lakeformation.read_sql_query(
    sql=f"SELECT * FROM {table} WHERE id=:id; AND name=:name;",
    database=database,
    query_as_of_time=query_as_of_time,
    params={"id": 1, "name": "Ayoub"}
)
1.2 Read full table
[ ]:
df = wr.lakeformation.read_sql_table(
    table=table,
    database=database
)
2. Write Governed table
2.1 Create a new Governed table

Enter your bucket name:

[ ]:
import getpass

bucket = getpass.getpass()

If a governed table does not exist, it can be created by passing an S3 path argument. Make sure your IAM user/role has enough permissions in the Lake Formation database

2.1.1 CSV table

[ ]:
import pandas as pd

table = "gov_table_csv"

df=pd.DataFrame({
    "col": [1, 2, 3],
    "col2": ["A", "A", "B"],
    "col3": [None, "test", None]
})
# Note 1: If a transaction_id is not specified, a new transaction is started
# Note 2: When creating a new Governed table, `table_type="GOVERNED"` must be specified. Otherwise the default is to create an EXTERNAL_TABLE
wr.s3.to_csv(
    df=df,
    path=f"s3://{bucket}/{database}/{table}/",  # S3 path
    dataset=True,
    database=database,
    table=table,
    table_type="GOVERNED"
)

2.1.2 Parquet table

[ ]:
table = "gov_table_parquet"

df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/{database}/{table}/",
    dataset=True,
    database=database,
    table=table,
    table_type="GOVERNED",
    description="c0",
    parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
    columns_comments={"c0": "0"}
)
2.2 Overwrite operations

2.2.1 Overwrite

[ ]:
df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
    df=df,
    dataset=True,
    mode="overwrite",
    database=database,
    table=table,
    description="c1",
    parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))},
    columns_comments={"c1": "1"}
)

2.2.2 Append

[ ]:
df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int8")
wr.s3.to_parquet(
    df=df,
    dataset=True,
    mode="append",
    database=database,
    table=table,
    description="c1",
    parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)},
    columns_comments={"c1": "1"}
)

2.2.3 Create partitioned Governed table

[ ]:
table = "gov_table_parquet_partitioned"

df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]})
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/{database}/{table}/",
    dataset=True,
    database=database,
    table=table,
    table_type="GOVERNED",
    partition_cols=["c1"],
    description="c0+c1",
    parameters={"num_cols": "2", "num_rows": "2"},
    columns_comments={"c0": "zero", "c1": "one"}
)

2.2.4 Overwrite partitions

[ ]:
df = pd.DataFrame({"c0": [None, None], "c1": [0, 2]})
wr.s3.to_parquet(
    df=df,
    dataset=True,
    mode="overwrite_partitions",
    database=database,
    table=table,
    partition_cols=["c1"],
    description="c0+c1",
    parameters={"num_cols": "2", "num_rows": "3"},
    columns_comments={"c0": "zero", "c1": "one"}
)
3. Multiple read/write operations within a transaction
[ ]:
read_table = "gov_table_parquet"
write_table = "gov_table_multi_parquet"

transaction_id = wr.lakeformation.start_transaction(read_only=False)

df = pd.DataFrame({"c0": [0, None]}, dtype="Int64")
wr.s3.to_parquet(
    df=df,
    path=f"s3://{bucket}/{database}/{write_table}_1",
    dataset=True,
    database=database,
    table=f"{write_table}_1",
    table_type="GOVERNED",
    transaction_id=transaction_id,
)

df2 = wr.lakeformation.read_sql_table(
    table=read_table,
    database=database,
    transaction_id=transaction_id,
    use_threads=True
)

df3 = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16")
wr.s3.to_parquet(
    df=df2,
    path=f"s3://{bucket}/{database}/{write_table}_2",
    dataset=True,
    mode="append",
    database=database,
    table=f"{write_table}_2",
    table_type="GOVERNED",
    transaction_id=transaction_id,
)

wr.lakeformation.commit_transaction(transaction_id=transaction_id)

AWS SDK for pandas

33 - Amazon Neptune

Note: to be able to use SPARQL you must either install SPARQLWrapper or install AWS SDK for pandas with sparql extra:

[ ]:
!pip install awswrangler[sparql]

Initialize

The first step to using AWS SDK for pandas with Amazon Neptune is to import the library and create a client connection.

Note: Connecting to Amazon Neptune requires that the application you are running has access to the Private VPC where Neptune is located. Without this access you will not be able to connect using AWS SDK for pandas.

[ ]:
import awswrangler as wr
import pandas as pd

url='<INSERT CLUSTER ENDPOINT>' # The Neptune Cluster endpoint
iam_enabled = False # Set to True/False based on the configuration of your cluster
neptune_port = 8182 # Set to the Neptune Cluster Port, Default is 8182
client = wr.neptune.connect(url, neptune_port, iam_enabled=iam_enabled)

Return the status of the cluster

[ ]:
print(client.status())

Retrieve Data from Neptune using AWS SDK for pandas

AWS SDK for pandas supports querying Amazon Neptune using TinkerPop Gremlin and openCypher for property graph data or SPARQL for RDF data.

Gremlin
[ ]:
query = "g.E().project('source', 'target').by(outV().id()).by(inV().id()).limit(5)"
df = wr.neptune.execute_gremlin(client, query)
display(df.head(5))
SPARQL
[ ]:
query = """
        PREFIX foaf: <https://xmlns.com/foaf/0.1/>
        PREFIX ex: <https://www.example.com/>
        SELECT ?firstName WHERE { ex:JaneDoe foaf:knows ?person . ?person foaf:firstName ?firstName }"""
df = wr.neptune.execute_sparql(client, query)
display(df.head(5))
openCypher
[ ]:
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)
display(df.head(5))

Saving Data using AWS SDK for pandas

AWS SDK for pandas supports saving Pandas DataFrames into Amazon Neptune using either a property graph or RDF data model.

Property Graph

If writing to a property graph then DataFrames for vertices and edges must be written separately. DataFrames for vertices must have a ~label column with the label and a ~id column for the vertex id.

If the ~id column does not exist, the specified id does not exists, or is empty then a new vertex will be added.

If no ~label column exists then writing to the graph will be treated as an update of the element with the specified ~id value.

DataFrames for edges must have a ~id, ~label, ~to, and ~from column. If the ~id column does not exist the specified id does not exists, or is empty then a new edge will be added. If no ~label, ~to, or ~from column exists an exception will be thrown.

Add Vertices/Nodes
[ ]:
import uuid
import random
import string
def _create_dummy_vertex():
    data = dict()
    data["~id"] = uuid.uuid4()
    data["~label"] = "foo"
    data["int"] = random.randint(0, 1000)
    data["str"] = "".join(random.choice(string.ascii_lowercase) for i in range(10))
    data["list"] = [random.randint(0, 1000), random.randint(0, 1000)]
    return data

data = [_create_dummy_vertex(), _create_dummy_vertex(), _create_dummy_vertex()]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
query = f"MATCH (s) WHERE id(s)='{data[0]['~id']}' RETURN s"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Add Edges
[ ]:
import uuid
import random
import string
def _create_dummy_edge():
    data = dict()
    data["~id"] = uuid.uuid4()
    data["~label"] = "bar"
    data["~to"] = uuid.uuid4()
    data["~from"] = uuid.uuid4()
    data["int"] = random.randint(0, 1000)
    data["str"] = "".join(random.choice(string.ascii_lowercase) for i in range(10))
    return data

data = [_create_dummy_edge(), _create_dummy_edge(), _create_dummy_edge()]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
query = f"MATCH (s)-[r]->(d) WHERE id(r)='{data[0]['~id']}' RETURN r"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Update Existing Nodes
[ ]:
idval=uuid.uuid4()
wr.neptune.execute_gremlin(client, f"g.addV().property(T.id, '{str(idval)}')")
query = f"MATCH (s) WHERE id(s)='{idval}' RETURN s"
df = wr.neptune.execute_opencypher(client, query)
print("Before")
display(df)
data = [{"~id": idval, "age": 50}]
df = pd.DataFrame(data)
res = wr.neptune.to_property_graph(client, df)
df = wr.neptune.execute_opencypher(client, query)
print("After")
display(df)
Setting cardinality based on the header

If you would like to save data using single cardinality then you can postfix (single) to the column header and set use_header_cardinality=True (default). e.g. A column named name(single) will save the name property as single cardinality. You can disable this by setting use_header_cardinality=False.

[ ]:
data = [_create_dummy_vertex()]
df = pd.DataFrame(data)
# Adding (single) to the column name in the DataFrame will cause it to write that property as `single` cardinality
df.rename(columns={"int": "int(single)"}, inplace=True)
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=True)


# This can be disabled by setting `use_header_cardinality = False`
df.rename(columns={"int": "int(single)"}, inplace=True)
res = wr.neptune.to_property_graph(client, df, use_header_cardinality=False)
RDF

The DataFrame must consist of triples with column names for the subject, predicate, and object specified. If none are provided then s, p, and o are the default.

If you want to add data into a named graph then you will also need the graph column, default is g.

Write Triples
[ ]:
def _create_dummy_triple():
    data = dict()
    data["s"] = "http://example.com/resources/foo"
    data["p"] = uuid.uuid4()
    data["o"] = random.randint(0, 1000)
    return data

data = [_create_dummy_triple(), _create_dummy_triple(), _create_dummy_triple()]
df = pd.DataFrame(data)
res = wr.neptune.to_rdf_graph(client, df)
query = """
    PREFIX foo: <http://example.com/resources/>
    SELECT ?o WHERE { <foo:foo> <" + str(data[0]['p']) + "> ?o .}"""
df = wr.neptune.execute_sparql(client, query)
display(df)
Write Quads
[ ]:
def _create_dummy_quad():
    data = _create_dummy_triple()
    data["g"] = "bar"
    return data

data = [_create_dummy_quad(), _create_dummy_quad(), _create_dummy_quad()]
df = pd.DataFrame(data)
res = wr.neptune.to_rdf_graph(client, df)
query = """
    PREFIX foo: <http://example.com/resources/>
    SELECT ?o WHERE { <foo:foo> <" + str(data[0]['p']) + "> ?o .}"""
df = wr.neptune.execute_sparql(client, query)
display(df)

Flatten DataFrames

One of the complexities of working with a row/columns paradigm, such as Pandas, with graph results set is that it is very common for graph results to return complex and nested objects. To help simplify using the results returned from a graph within a more tabular format we have added a method to flatten the returned Pandas DataFrame.

Flattening the DataFrame
[ ]:
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n) RETURN n LIMIT 1"
df = wr.neptune.execute_opencypher(client, query)
print("Original")
display(df)
df_new=wr.neptune.flatten_nested_df(df)
print("Flattened")
display(df_new)
Removing the prefixing of the parent column name
[ ]:
df_new=wr.neptune.flatten_nested_df(df, include_prefix=False)
display(df_new)
Specifying the column header seperator
[ ]:
df_new=wr.neptune.flatten_nested_df(df, seperator='|')
display(df_new)

Putting it into a workflow

[ ]:
pip install igraph networkx
Running PageRank using NetworkX
[ ]:
import networkx as nx

# Retrieve Data from neptune
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100"
df = wr.neptune.execute_opencypher(client, query)

# Run PageRank
G=nx.from_pandas_edgelist(df, edge_attr=True)
pg = nx.pagerank(G)

# Save values back into Neptune
rows=[]
for k in pg.keys():
    rows.append({'~id': k, 'pageRank_nx(single)': pg[k]})
pg_df=pd.DataFrame(rows, columns=['~id','pageRank_nx(single)'])
res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)

# Retrieve newly saved data
query = "MATCH (n:airport) WHERE n.pageRank_nx IS NOT NULL RETURN n.code, n.pageRank_nx ORDER BY n.pageRank_nx DESC LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)
display(df)
Running PageRank using iGraph
[ ]:
import igraph as ig

# Retrieve Data from neptune
client = wr.neptune.connect(url, 8182, iam_enabled=False)
query = "MATCH (n)-[r]->(d) RETURN id(n) as source, id(d) as target LIMIT 100"
df = wr.neptune.execute_opencypher(client, query)

# Run PageRank
g = ig.Graph.TupleList(df.itertuples(index=False), directed=True, weights=False)
pg = g.pagerank()

# Save values back into Neptune
rows=[]
for idx, v in enumerate(g.vs):
    rows.append({'~id': v['name'], 'pageRank_ig(single)': pg[idx]})
pg_df=pd.DataFrame(rows, columns=['~id','pageRank_ig(single)'])
res = wr.neptune.to_property_graph(client, pg_df, use_header_cardinality=True)

# Retrieve newly saved data
query = "MATCH (n:airport) WHERE n.pageRank_ig IS NOT NULL RETURN n.code, n.pageRank_ig ORDER BY n.pageRank_ig DESC LIMIT 5"
df = wr.neptune.execute_opencypher(client, query)
display(df)

AWS SDK for pandas

34 - Glue Data Quality

AWS Glue Data Quality helps you evaluate and monitor the quality of your data.

Create test data

First, let’s start by creating test data, writing it to S3, and registering it in the Glue Data Catalog.

[ ]:
import awswrangler as wr
import pandas as pd

glue_database = "aws_sdk_pandas"
glue_table = "my_glue_table"
path = "s3://BUCKET_NAME/my_glue_table/"

df = pd.DataFrame({"c0": [0, 1, 2], "c1": [0, 1, 2], "c2": [0, 0, 0]})
wr.s3.to_parquet(df, path, dataset=True, database=glue_database, table=glue_table, partition_cols=["c2"])

Run a data quality task

The ruleset can now be evaluated against the data. A cluster with 2 workers is used for the run. It returns a report with PASS/FAIL results for each rule.

[20]:
wr.data_quality.evaluate_ruleset(
    name=first_ruleset,
    iam_role_arn=iam_role_arn,
    number_of_workers=2,
)
[20]:
Name Description Result ResultId EvaluationMessage
0 Rule_1 RowCount between 1 and 6 PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
1 Rule_2 IsComplete "c0" PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
2 Rule_3 Uniqueness "c0" > 0.95 PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
3 Rule_4 ColumnValues "c0" <= 2 PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
4 Rule_5 IsComplete "c1" PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
5 Rule_6 Uniqueness "c1" > 0.95 PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
6 Rule_7 ColumnValues "c1" <= 2 PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
7 Rule_8 IsComplete "c2" PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
8 Rule_9 ColumnValues "c2" in [0,1,2] PASS dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 NaN
9 Rule_10 Uniqueness "c2" > 0.95 FAIL dqresult-be413b527c0e5520ad843323fecd9cf2e2edbdd5 Value: 0.0 does not meet the constraint requir...

Create ruleset from Data Quality Definition Language definition

The Data Quality Definition Language (DQDL) is a domain specific language that you can use to define Data Quality rules. For the full syntax reference, see DQDL.

[21]:
second_ruleset = "ruleset_2"

dqdl_rules = (
    "Rules = ["
    "RowCount between 1 and 6,"
    'IsComplete "c0",'
    'Uniqueness "c0" > 0.95,'
    'ColumnValues "c0" <= 2,'
    'IsComplete "c1",'
    'Uniqueness "c1" > 0.95,'
    'ColumnValues "c1" <= 2,'
    'IsComplete "c2",'
    'ColumnValues "c2" <= 1'
    "]"
)

wr.data_quality.create_ruleset(
    name=second_ruleset,
    database=glue_database,
    table=glue_table,
    dqdl_rules=dqdl_rules,
)

Create or update a ruleset from a data frame

AWS SDK for pandas also enables you to create or update a ruleset from a pandas data frame.

[24]:
third_ruleset = "ruleset_3"

df_rules = pd.DataFrame({
    "rule_type": ["RowCount", "ColumnCorrelation", "Uniqueness"],
    "parameter": [None, '"c0" "c1"', '"c0"'],
    "expression": ["between 2 and 8", "> 0.8", "> 0.95"],
})

wr.data_quality.create_ruleset(
    name=third_ruleset,
    df_rules=df_rules,
    database=glue_database,
    table=glue_table,
)

wr.data_quality.get_ruleset(name=third_ruleset)
[24]:
rule_type parameter expression
0 RowCount None between 2 and 8
1 ColumnCorrelation "c0" "c1" > 0.8
2 Uniqueness "c0" > 0.95

Get multiple rulesets

[25]:
wr.data_quality.get_ruleset(name=[first_ruleset, second_ruleset, third_ruleset])
[25]:
rule_type parameter expression ruleset
0 RowCount None between 1 and 6 ruleset_1
1 IsComplete "c0" None ruleset_1
2 Uniqueness "c0" > 0.95 ruleset_1
3 ColumnValues "c0" <= 2 ruleset_1
4 IsComplete "c1" None ruleset_1
5 Uniqueness "c1" > 0.95 ruleset_1
6 ColumnValues "c1" <= 2 ruleset_1
7 IsComplete "c2" None ruleset_1
8 ColumnValues "c2" in [0, 1, 2] ruleset_1
9 Uniqueness "c2" > 0.95 ruleset_1
0 RowCount None between 1 and 6 ruleset_2
1 IsComplete "c0" None ruleset_2
2 Uniqueness "c0" > 0.95 ruleset_2
3 ColumnValues "c0" <= 2 ruleset_2
4 IsComplete "c1" None ruleset_2
5 Uniqueness "c1" > 0.95 ruleset_2
6 ColumnValues "c1" <= 2 ruleset_2
7 IsComplete "c2" None ruleset_2
8 ColumnValues "c2" <= 1 ruleset_2
0 RowCount None between 2 and 8 ruleset_3
1 ColumnCorrelation "c0" "c1" > 0.8 ruleset_3
2 Uniqueness "c0" > 0.95 ruleset_3

Evaluate Data Quality for a given partition

A data quality evaluation run can be limited to specific partition(s) by leveraging the pushDownPredicate expression in the additional_options argument

[26]:
df = pd.DataFrame({"c0": [2, 0, 1], "c1": [1, 0, 2], "c2": [1, 1, 1]})
wr.s3.to_parquet(df, path, dataset=True, database=glue_database, table=glue_table, partition_cols=["c2"])

wr.data_quality.evaluate_ruleset(
    name=third_ruleset,
    iam_role_arn=iam_role_arn,
    number_of_workers=2,
    additional_options={
        "pushDownPredicate": "(c2 == '1')",
    },
)
[26]:
Name Description Result ResultId EvaluationMessage
0 Rule_1 RowCount between 2 and 8 PASS dqresult-f676cfe0345aa93f492e3e3c3d6cf1ad99b84dc6 NaN
1 Rule_2 ColumnCorrelation "c0" "c1" > 0.8 FAIL dqresult-f676cfe0345aa93f492e3e3c3d6cf1ad99b84dc6 Value: 0.5 does not meet the constraint requir...
2 Rule_3 Uniqueness "c0" > 0.95 PASS dqresult-f676cfe0345aa93f492e3e3c3d6cf1ad99b84dc6 NaN

AWS SDK for pandas

35 - OpenSearch Serverless

Amazon OpenSearch Serverless is an on-demand serverless configuration for Amazon OpenSearch Service.

Create collection

A collection in Amazon OpenSearch Serverless is a logical grouping of one or more indexes that represent an analytics workload.

Collections must have an assigned encryption policy, network policy, and a matching data access policy that grants permission to its resources.

[1]:
import awswrangler as wr
[8]:
data_access_policy = [
    {
        "Rules": [
            {
                "ResourceType": "index",
                "Resource": [
                    "index/my-collection/*",
                ],
                "Permission": [
                    "aoss:*",
                ],
            },
            {
                "ResourceType": "collection",
                "Resource": [
                    "collection/my-collection",
                ],
                "Permission": [
                    "aoss:*",
                ],
            },
        ],
        "Principal": [
            wr.sts.get_current_identity_arn(),
        ],
    }
]

AWS SDK for pandas can create default network and encryption policies based on the user input.

By default, the network policy allows public access to the collection, and the encryption policy encrypts the collection using AWS-managed KMS key.

Create a collection, and a corresponding data, network, and access policies:

[10]:
collection = wr.opensearch.create_collection(
    name="my-collection",
    data_policy=data_access_policy,
)

collection_endpoint = collection["collectionEndpoint"]

The call will wait and exit when the collection and corresponding policies are created and active.

To create a collection encrypted with customer KMS key, and attached to a VPC, provide KMS Key ARN and / or VPC endpoints:

[ ]:
kms_key_arn = "arn:aws:kms:..."
vpc_endpoint = "vpce-..."

collection = wr.opensearch.create_collection(
    name="my-secure-collection",
    data_policy=data_access_policy,
    kms_key_arn=kms_key_arn,
    vpc_endpoints=[vpc_endpoint],
)
Connect

Connect to the collection endpoint:

[12]:
client = wr.opensearch.connect(host=collection_endpoint)
Create index

To create an index, run:

[13]:
index="my-index-1"

wr.opensearch.create_index(
    client=client,
    index=index,
)
[13]:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-index-1'}
Index documents

To index documents:

[25]:
wr.opensearch.index_documents(
    client,
    documents=[{"_id": "1", "name": "John"}, {"_id": "2", "name": "George"}, {"_id": "3", "name": "Julia"}],
    index=index,
)
Indexing: 100% (3/3)|####################################|Elapsed Time: 0:00:12
[25]:
{'success': 3, 'errors': []}

It is also possible to index Pandas data frames:

[26]:
import pandas as pd

df = pd.DataFrame(
    [{"_id": "1", "name": "John", "tags": ["foo", "bar"]}, {"_id": "2", "name": "George", "tags": ["foo"]}]
)

wr.opensearch.index_df(
    client,
    df=df,
    index="index-df",
)
Indexing: 100% (2/2)|####################################|Elapsed Time: 0:00:12
[26]:
{'success': 2, 'errors': []}

AWS SDK for pandas also supports indexing JSON and CSV documents.

For more examples, refer to the 031 - OpenSearch tutorial

Delete index

To delete an index, run:

[ ]:
wr.opensearch.delete_index(
     client=client,
     index=index
)

AWS SDK for pandas

Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue catalog for their metastore. More in User Guide.

Create Iceberg table

[50]:
import getpass
bucket_name = getpass.getpass()
[52]:
import awswrangler as wr

glue_database = "aws_sdk_pandas"
glue_table = "iceberg_test"
path = f"s3://{bucket_name}/iceberg_test/"

# Cleanup table before create
wr.catalog.delete_table_if_exists(database=glue_database, table=glue_table)

create_sql = (
    f"CREATE TABLE {glue_table} (id int, name string) "
    f"LOCATION '{path}' "
    f"TBLPROPERTIES ( 'table_type' ='ICEBERG', 'format'='parquet' )"
)

wr.athena.start_query_execution(
    sql=create_sql,
    database=glue_database,
    wait=True,
)
[52]:
{'QueryExecutionId': '679240c6-020b-4226-a72f-11db54b8598b',
 'Query': "CREATE TABLE iceberg_test (id int, name string) LOCATION 's3://.../iceberg_test/' TBLPROPERTIES ( 'table_type' ='ICEBERG', 'format'='parquet' )",
 'StatementType': 'DDL',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/679240c6-020b-4226-a72f-11db54b8598b.txt'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 39, 46, 276000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 39, 46, 913000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 490,
  'DataScannedInBytes': 0,
  'TotalExecutionTimeInMillis': 637,
  'QueryQueueTimeInMillis': 122,
  'ServiceProcessingTimeInMillis': 25,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}

Insert data

[53]:
wr.athena.start_query_execution(
    sql=f"INSERT INTO {glue_table} VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
    database=glue_database,
    wait=True,
)
[53]:
{'QueryExecutionId': 'e339fcd2-9db1-43ac-bb9e-9730e6395b51',
 'Query': "INSERT INTO iceberg_test VALUES (1,'John'), (2, 'Lily'), (3, 'Richard')",
 'StatementType': 'DML',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 8, 612000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 11, 143000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2242,
  'DataScannedInBytes': 0,
  'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/e339fcd2-9db1-43ac-bb9e-9730e6395b51-manifest.csv',
  'TotalExecutionTimeInMillis': 2531,
  'QueryQueueTimeInMillis': 241,
  'QueryPlanningTimeInMillis': 179,
  'ServiceProcessingTimeInMillis': 48,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}
[54]:
wr.athena.start_query_execution(
    sql=f"INSERT INTO {glue_table} VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
    database=glue_database,
    wait=True,
)
[54]:
{'QueryExecutionId': '922c8f02-4c00-4050-b4a7-7016809efa2b',
 'Query': "INSERT INTO iceberg_test VALUES (4,'Anne'), (5, 'Jacob'), (6, 'Leon')",
 'StatementType': 'DML',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 24, 582000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 40, 27, 352000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2414,
  'DataScannedInBytes': 0,
  'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/922c8f02-4c00-4050-b4a7-7016809efa2b-manifest.csv',
  'TotalExecutionTimeInMillis': 2770,
  'QueryQueueTimeInMillis': 329,
  'QueryPlanningTimeInMillis': 189,
  'ServiceProcessingTimeInMillis': 27,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}

Query

[65]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[65]:
id name
0 1 John
1 4 Anne
2 2 Lily
3 3 Richard
4 5 Jacob
5 6 Leon

Read query metadata

In a SELECT query, you can use the following properties after table_name to query Iceberg table metadata:

  • $files Shows a table’s current data files

  • $manifests Shows a table’s current file manifests

  • $history Shows a table’s history

  • $partitions Shows a table’s current partitions

[55]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$files"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[55]:
content file_path file_format record_count file_size_in_bytes column_sizes value_counts null_value_counts nan_value_counts lower_bounds upper_bounds key_metadata split_offsets equality_ids
0 0 s3://.../iceberg_test/data/089a... PARQUET 3 360 {1=48, 2=63} {1=3, 2=3} {1=0, 2=0} {} {1=1, 2=John} {1=3, 2=Richard} <NA> NaN NaN
1 0 s3://.../iceberg_test/data/5736... PARQUET 3 355 {1=48, 2=61} {1=3, 2=3} {1=0, 2=0} {} {1=4, 2=Anne} {1=6, 2=Leon} <NA> NaN NaN
[56]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$manifests"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[56]:
path length partition_spec_id added_snapshot_id added_data_files_count added_rows_count existing_data_files_count existing_rows_count deleted_data_files_count deleted_rows_count partitions
0 s3://.../iceberg_test/metadata/... 6538 0 4379263637983206651 1 3 0 0 0 0 []
1 s3://.../iceberg_test/metadata/... 6548 0 2934717851675145063 1 3 0 0 0 0 []
[58]:
df = wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$history"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)

# Save snapshot id
snapshot_id = df.snapshot_id[0]

df
[58]:
made_current_at snapshot_id parent_id is_current_ancestor
0 2023-03-16 09:40:10.438000+00:00 2934717851675145063 <NA> True
1 2023-03-16 09:40:26.754000+00:00 4379263637983206651 2934717851675144704 True
[59]:
wr.athena.read_sql_query(
    sql=f'SELECT * FROM "{glue_table}$partitions"',
    database=glue_database,
    ctas_approach=False,
    unload_approach=False,
)
[59]:
record_count file_count total_size data
0 6 2 715 {id={min=1, max=6, null_count=0, nan_count=nul...

Time travel

[60]:
wr.athena.read_sql_query(
    sql=f"SELECT * FROM {glue_table} FOR TIMESTAMP AS OF (current_timestamp - interval '5' second)",
    database=glue_database,
)
[60]:
id name
0 1 John
1 4 Anne
2 2 Lily
3 3 Richard
4 5 Jacob
5 6 Leon

Version travel

[61]:
wr.athena.read_sql_query(
    sql=f"SELECT * FROM {glue_table} FOR VERSION AS OF {snapshot_id}",
    database=glue_database,
)
[61]:
id name
0 1 John
1 2 Lily
2 3 Richard

Optimize

The OPTIMIZE table REWRITE DATA compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files. For syntax and table property details, see OPTIMIZE.

[62]:
wr.athena.start_query_execution(
    sql=f"OPTIMIZE {glue_table} REWRITE DATA USING BIN_PACK",
    database=glue_database,
    wait=True,
)
[62]:
{'QueryExecutionId': '94666790-03ae-42d7-850a-fae99fa79a68',
 'Query': 'OPTIMIZE iceberg_test REWRITE DATA USING BIN_PACK',
 'StatementType': 'DDL',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 42, 857000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 49, 45, 655000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2622,
  'DataScannedInBytes': 220,
  'DataManifestLocation': 's3://aws-athena-query-results-...-us-east-1/tables/94666790-03ae-42d7-850a-fae99fa79a68-manifest.csv',
  'TotalExecutionTimeInMillis': 2798,
  'QueryQueueTimeInMillis': 124,
  'QueryPlanningTimeInMillis': 252,
  'ServiceProcessingTimeInMillis': 52,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}

Vacuum

VACUUM performs snapshot expiration and orphan file removal. These actions reduce metadata size and remove files not in the current table state that are also older than the retention period specified for the table. For syntax details, see VACUUM.

[64]:
wr.athena.start_query_execution(
    sql=f"VACUUM {glue_table}",
    database=glue_database,
    wait=True,
)
[64]:
{'QueryExecutionId': '717a7de6-b873-49c7-b744-1b0b402f24c9',
 'Query': 'VACUUM iceberg_test',
 'StatementType': 'DML',
 'ResultConfiguration': {'OutputLocation': 's3://aws-athena-query-results-...-us-east-1/717a7de6-b873-49c7-b744-1b0b402f24c9.csv'},
 'ResultReuseConfiguration': {'ResultReuseByAgeConfiguration': {'Enabled': False}},
 'QueryExecutionContext': {'Database': 'aws_sdk_pandas'},
 'Status': {'State': 'SUCCEEDED',
  'SubmissionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 41, 14000, tzinfo=tzlocal()),
  'CompletionDateTime': datetime.datetime(2023, 3, 16, 10, 50, 43, 441000, tzinfo=tzlocal())},
 'Statistics': {'EngineExecutionTimeInMillis': 2229,
  'DataScannedInBytes': 0,
  'TotalExecutionTimeInMillis': 2427,
  'QueryQueueTimeInMillis': 153,
  'QueryPlanningTimeInMillis': 30,
  'ServiceProcessingTimeInMillis': 45,
  'ResultReuseInformation': {'ReusedPreviousResult': False}},
 'WorkGroup': 'primary',
 'EngineVersion': {'SelectedEngineVersion': 'Athena engine version 3',
  'EffectiveEngineVersion': 'Athena engine version 3'}}

API Reference

Amazon S3

copy_objects(paths, source_path, target_path)

Copy a list of S3 objects to another S3 directory.

delete_objects(path[, use_threads, ...])

Delete Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

describe_objects(path[, version_id, ...])

Describe Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

does_object_exist(path[, ...])

Check if object exists on S3.

download(path, local_file[, version_id, ...])

Download file from a received S3 path to local file.

get_bucket_region(bucket[, boto3_session])

Get bucket region name.

list_buckets([boto3_session])

List Amazon S3 buckets.

list_directories(path[, chunked, ...])

List Amazon S3 objects from a prefix.

list_objects(path[, suffix, ignore_suffix, ...])

List Amazon S3 objects from a prefix.

merge_datasets(source_path, target_path[, ...])

Merge a source dataset into a target dataset.

merge_upsert_table(delta_df, database, ...)

Perform Upsert (Update else Insert) onto an existing Glue table.

read_csv(path[, path_suffix, ...])

Read CSV file(s) from a received S3 prefix or list of S3 objects paths.

read_deltalake([path, version, partitions, ...])

Load a Deltalake table data from an S3 path.

read_excel(path[, version_id, use_threads, ...])

Read EXCEL file(s) from a received S3 path.

read_fwf(path[, path_suffix, ...])

Read fixed-width formatted file(s) from a received S3 prefix or list of S3 objects paths.

read_json(path[, path_suffix, ...])

Read JSON file(s) from a received S3 prefix or list of S3 objects paths.

read_parquet(path[, path_root, path_suffix, ...])

Read Apache Parquet file(s) from a received S3 prefix or list of S3 objects paths.

read_parquet_metadata(path[, version_id, ...])

Read Apache Parquet file(s) metadata from a received S3 prefix or list of S3 objects paths.

read_parquet_table(table, database[, ...])

Read Apache Parquet table registered on AWS Glue Catalog.

select_query(sql, path, input_serialization, ...)

Filter contents of an Amazon S3 object based on SQL statement.

size_objects(path[, version_id, ...])

Get the size (ContentLength) in bytes of Amazon S3 objects from a received S3 prefix or list of S3 objects paths.

store_parquet_metadata(path, database, table)

Infer and store parquet metadata on AWS Glue Catalog.

to_csv(df[, path, sep, index, columns, ...])

Write CSV file or dataset on Amazon S3.

to_excel(df, path[, boto3_session, ...])

Write EXCEL file on Amazon S3.

to_json(df[, path, index, columns, ...])

Write JSON file on Amazon S3.

to_parquet(df[, path, index, compression, ...])

Write Parquet file or dataset on Amazon S3.

upload(local_file, path[, use_threads, ...])

Upload file from a local file to received S3 path.

wait_objects_exist(paths[, delay, ...])

Wait Amazon S3 objects exist.

wait_objects_not_exist(paths[, delay, ...])

Wait Amazon S3 objects not exist.

AWS Glue Catalog

add_column(database, table, column_name[, ...])

Add a column in a AWS Glue Catalog table.

add_csv_partitions(database, table, ...[, ...])

Add partitions (metadata) to a CSV Table in the AWS Glue Catalog.

add_parquet_partitions(database, table, ...)

Add partitions (metadata) to a Parquet Table in the AWS Glue Catalog.

create_csv_table(database, table, path, ...)

Create a CSV Table (Metadata Only) in the AWS Glue Catalog.

create_database(name[, description, ...])

Create a database in AWS Glue Catalog.

create_json_table(database, table, path, ...)

Create a JSON Table (Metadata Only) in the AWS Glue Catalog.

create_parquet_table(database, table, path, ...)

Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.

databases([limit, catalog_id, boto3_session])

Get a Pandas DataFrame with all listed databases.

delete_column(database, table, column_name)

Delete a column in a AWS Glue Catalog table.

delete_database(name[, catalog_id, ...])

Delete a database in AWS Glue Catalog.

delete_partitions(table, database, ...[, ...])

Delete specified partitions in a AWS Glue Catalog table.

delete_all_partitions(table, database[, ...])

Delete all partitions in a AWS Glue Catalog table.

delete_table_if_exists(database, table[, ...])

Delete Glue table if exists.

does_table_exist(database, table[, ...])

Check if the table exists.

drop_duplicated_columns(df)

Drop all repeated columns (duplicated names).

extract_athena_types(df[, index, ...])

Extract columns and partitions types (Amazon Athena) from Pandas DataFrame.

get_columns_comments(database, table[, ...])

Get all columns comments.

get_csv_partitions(database, table[, ...])

Get all partitions from a Table in the AWS Glue Catalog.

get_databases([catalog_id, boto3_session])

Get an iterator of databases.

get_parquet_partitions(database, table[, ...])

Get all partitions from a Table in the AWS Glue Catalog.

get_partitions(database, table[, ...])

Get all partitions from a Table in the AWS Glue Catalog.

get_table_description(database, table[, ...])

Get table description.

get_table_location(database, table[, ...])

Get table's location on Glue catalog.

get_table_number_of_versions(database, table)

Get tatal number of versions.

get_table_parameters(database, table[, ...])

Get all parameters.

get_table_types(database, table[, ...])

Get all columns and types from a table.

get_table_versions(database, table[, ...])

Get all versions.

get_tables([catalog_id, database, ...])

Get an iterator of tables.

overwrite_table_parameters(parameters, ...)

Overwrite all existing parameters.

sanitize_column_name(column)

Convert the column name to be compatible with Amazon Athena and the AWS Glue Catalog.

sanitize_dataframe_columns_names(df[, ...])

Normalize all columns names to be compatible with Amazon Athena.

sanitize_table_name(table)

Convert the table name to be compatible with Amazon Athena and the AWS Glue Catalog.

search_tables(text[, catalog_id, boto3_session])

Get Pandas DataFrame of tables filtered by a search string.

table(database, table[, transaction_id, ...])

Get table details as Pandas DataFrame.

tables([limit, catalog_id, database, ...])

Get a DataFrame with tables filtered by a search term, prefix, suffix.

upsert_table_parameters(parameters, ...[, ...])

Insert or Update the received parameters.

Amazon Athena

create_athena_bucket([boto3_session])

Create the default Athena bucket if it doesn't exist.

create_ctas_table(sql[, database, ...])

Create a new table populated with the results of a SELECT query.

generate_create_query(table[, database, ...])

Generate the query that created a table(EXTERNAL_TABLE) or a view(VIRTUAL_TABLE).

get_query_columns_types(query_execution_id)

Get the data type of all columns queried.

get_query_execution(query_execution_id[, ...])

Fetch query execution details.

get_query_executions(query_execution_ids[, ...])

From specified query execution IDs, return a DataFrame of query execution details.

get_query_results(query_execution_id[, ...])

Get AWS Athena SQL query results as a Pandas DataFrame.

get_named_query_statement(named_query_id[, ...])

Get the named query statement string from a query ID.

get_work_group(workgroup[, boto3_session])

Return information about the workgroup with the specified name.

list_query_executions([workgroup, boto3_session])

Fetch list query execution IDs ran in specified workgroup or primary work group if not specified.

read_sql_query(sql, database[, ...])

Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.

read_sql_table(table, database[, ...])

Extract the full table AWS Athena and return the results as a Pandas DataFrame.

repair_table(table[, database, data_source, ...])

Run the Hive's metastore consistency check: 'MSCK REPAIR TABLE table;'.

start_query_execution(sql[, database, ...])

Start a SQL Query against AWS Athena.

stop_query_execution(query_execution_id[, ...])

Stop a query execution.

unload(sql, path, database[, file_format, ...])

Write query results from a SELECT statement to the specified data format using UNLOAD.

wait_query(query_execution_id[, ...])

Wait for the query end.

AWS Lake Formation

read_sql_query(sql, database[, ...])

Execute PartiQL query on AWS Glue Table (Transaction ID or time travel timestamp).

read_sql_table(table, database[, ...])

Extract all rows from AWS Glue Table (Transaction ID or time travel timestamp).

cancel_transaction(transaction_id[, ...])

Cancel the specified transaction.

commit_transaction(transaction_id[, ...])

Commit the specified transaction.

describe_transaction(transaction_id[, ...])

Return the status of a single transaction.

extend_transaction(transaction_id[, ...])

Indicate to the service that the specified transaction is still active and should not be canceled.

start_transaction([read_only, time_out, ...])

Start a new transaction and returns its transaction ID.

wait_query(query_id[, boto3_session, ...])

Wait for the query to end.

Amazon Redshift

connect([connection, secret_id, catalog_id, ...])

Return a redshift_connector connection from a Glue Catalog or Secret Manager.

connect_temp(cluster_identifier, user[, ...])

Return a redshift_connector temporary connection (No password required).

copy(df, path, con, table, schema[, ...])

Load Pandas DataFrame as a Table on Amazon Redshift using parquet files on S3 as stage.

copy_from_files(path, con, table, schema[, ...])

Load Parquet files from S3 to a Table on Amazon Redshift (Through COPY command).

read_sql_query(sql, con[, index_col, ...])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, ...])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, ...])

Write records stored in a DataFrame into Redshift.

unload(sql, path, con[, iam_role, ...])

Load Pandas DataFrame from a Amazon Redshift query result using Parquet files on s3 as stage.

unload_to_files(sql, path, con[, iam_role, ...])

Unload Parquet files on s3 from a Redshift query result (Through the UNLOAD command).

PostgreSQL

connect([connection, secret_id, catalog_id, ...])

Return a pg8000 connection from a Glue Catalog Connection.

read_sql_query(sql, con[, index_col, ...])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, ...])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, ...])

Write records stored in a DataFrame into PostgreSQL.

MySQL

connect([connection, secret_id, catalog_id, ...])

Return a pymysql connection from a Glue Catalog Connection or Secrets Manager.

read_sql_query(sql, con[, index_col, ...])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, ...])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, ...])

Write records stored in a DataFrame into MySQL.

Microsoft SQL Server

connect([connection, secret_id, catalog_id, ...])

Return a pyodbc connection from a Glue Catalog Connection.

read_sql_query(sql, con[, index_col, ...])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, ...])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, ...])

Write records stored in a DataFrame into Microsoft SQL Server.

Oracle

connect([connection, secret_id, catalog_id, ...])

Return a oracledb connection from a Glue Catalog Connection.

read_sql_query(sql, con[, index_col, ...])

Return a DataFrame corresponding to the result set of the query string.

read_sql_table(table, con[, schema, ...])

Return a DataFrame corresponding the table.

to_sql(df, con, table, schema[, mode, ...])

Write records stored in a DataFrame into Oracle Database.

Data API Redshift

RedshiftDataApi([cluster_id, database, ...])

Provides access to a Redshift cluster via the Data API.

connect([cluster_id, database, ...])

Create a Redshift Data API connection.

read_sql_query(sql, con[, database])

Run an SQL query on a RedshiftDataApi connection and return the result as a dataframe.

Data API RDS

RdsDataApi(resource_arn, database[, ...])

Provides access to the RDS Data API.

connect(resource_arn, database[, ...])

Create a RDS Data API connection.

read_sql_query(sql, con[, database])

Run an SQL query on an RdsDataApi connection and return the result as a dataframe.

AWS Glue Data Quality

create_recommendation_ruleset(database, ...)

Create recommendation Data Quality ruleset.

create_ruleset(name, database, table[, ...])

Create Data Quality ruleset.

evaluate_ruleset(name, iam_role_arn[, ...])

Evaluate Data Quality ruleset.

get_ruleset(name[, boto3_session])

Get a Data Quality ruleset.

update_ruleset(name[, mode, df_rules, ...])

Update Data Quality ruleset.

OpenSearch

connect(host[, port, boto3_session, region, ...])

Create a secure connection to the specified Amazon OpenSearch domain.

create_collection(name[, collection_type, ...])

Create Amazon OpenSearch Serverless collection.

create_index(client, index[, doc_type, ...])

Create an index.

delete_index(client, index)

Delete an index.

index_csv(client, path, index[, doc_type, ...])

Index all documents from a CSV file to OpenSearch index.

index_documents(client, documents, index[, ...])

Index all documents to OpenSearch index.

index_df(client, df, index[, doc_type])

Index all documents from a DataFrame to OpenSearch index.

index_json(client, path, index[, doc_type, ...])

Index all documents from JSON file to OpenSearch index.

search(client[, index, search_body, ...])

Return results matching query DSL as pandas dataframe.

search_by_sql(client, sql_query, **kwargs)

Return results matching SQL query as pandas dataframe.

Amazon Neptune

connect(host, port[, iam_enabled])

Create a connection to a Neptune cluster.

execute_gremlin(client, query)

Return results of a Gremlin traversal as pandas dataframe.

execute_opencypher(client, query)

Return results of a openCypher traversal as pandas dataframe.

execute_sparql(client, query)

Return results of a SPARQL query as pandas dataframe.

flatten_nested_df(df[, include_prefix, ...])

Flatten the lists and dictionaries of the input data frame.

to_property_graph(client, df[, batch_size, ...])

Write records stored in a DataFrame into Amazon Neptune.

to_rdf_graph(client, df[, batch_size, ...])

Write records stored in a DataFrame into Amazon Neptune.

DynamoDB

delete_items(items, table_name[, boto3_session])

Delete all items in the specified DynamoDB table.

execute_statement(statement[, parameters, ...])

Run a PartiQL statement against a DynamoDB table.

get_table(table_name[, boto3_session])

Get DynamoDB table object for specified table name.

put_csv(path, table_name[, boto3_session])

Write all items from a CSV file to a DynamoDB.

put_df(df, table_name[, boto3_session])

Write all items from a DataFrame to a DynamoDB.

put_items(items, table_name[, boto3_session])

Insert all items to the specified DynamoDB table.

put_json(path, table_name[, boto3_session])

Write all items from JSON file to a DynamoDB.

read_items(table_name[, index_name, ...])

Read items from given DynamoDB table.

read_partiql_query(query[, parameters, ...])

Read data from a DynamoDB table via a PartiQL query.

Amazon Timestream

create_database(database[, kms_key_id, ...])

Create a new Timestream database.

create_table(database, table, ...[, tags, ...])

Create a new Timestream database.

delete_database(database[, boto3_session])

Delete a given Timestream database.

delete_table(database, table[, boto3_session])

Delete a given Timestream table.

query(sql[, chunked, pagination_config, ...])

Run a query and retrieve the result as a Pandas DataFrame.

write(df, database, table[, time_col, ...])

Store a Pandas DataFrame into a Amazon Timestream table.

Amazon EMR

build_spark_step(path[, args, deploy_mode, ...])

Build the Step structure (dictionary).

build_step(command[, name, ...])

Build the Step structure (dictionary).

create_cluster(subnet_id[, cluster_name, ...])

Create a EMR cluster with instance fleets configuration.

get_cluster_state(cluster_id[, boto3_session])

Get the EMR cluster state.

get_step_state(cluster_id, step_id[, ...])

Get EMR step state.

submit_ecr_credentials_refresh(cluster_id, path)

Update internal ECR credentials.

submit_spark_step(cluster_id, path[, args, ...])

Submit Spark Step.

submit_step(cluster_id, command[, name, ...])

Submit new job in the EMR Cluster.

submit_steps(cluster_id, steps[, boto3_session])

Submit a list of steps.

terminate_cluster(cluster_id[, boto3_session])

Terminate EMR cluster.

Amazon CloudWatch Logs

read_logs(query, log_group_names[, ...])

Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame.

run_query(query, log_group_names[, ...])

Run a query against AWS CloudWatchLogs Insights and wait the results.

start_query(query, log_group_names[, ...])

Run a query against AWS CloudWatchLogs Insights.

wait_query(query_id[, boto3_session, ...])

Wait query ends.

describe_log_streams(log_group_name[, ...])

List the log streams for the specified log group, return results as a Pandas DataFrame.

filter_log_events(log_group_name[, ...])

List log events from the specified log group.

Amazon QuickSight

cancel_ingestion(ingestion_id[, ...])

Cancel an ongoing ingestion of data into SPICE.

create_athena_data_source(name[, workgroup, ...])

Create a QuickSight data source pointing to an Athena/Workgroup.

create_athena_dataset(name[, database, ...])

Create a QuickSight dataset.

create_ingestion([dataset_name, dataset_id, ...])

Create and starts a new SPICE ingestion on a dataset.

delete_all_dashboards([account_id, ...])

Delete all dashboards.

delete_all_data_sources([account_id, ...])

Delete all data sources.

delete_all_datasets([account_id, ...])

Delete all datasets.

delete_all_templates([account_id, ...])

Delete all templates.

delete_dashboard([name, dashboard_id, ...])

Delete a dashboard.

delete_data_source([name, data_source_id, ...])

Delete a data source.

delete_dataset([name, dataset_id, ...])

Delete a dataset.

delete_template([name, template_id, ...])

Delete a tamplate.

describe_dashboard([name, dashboard_id, ...])

Describe a QuickSight dashboard by name or ID.

describe_data_source([name, data_source_id, ...])

Describe a QuickSight data source by name or ID.

describe_data_source_permissions([name, ...])

Describe a QuickSight data source permissions by name or ID.

describe_dataset([name, dataset_id, ...])

Describe a QuickSight dataset by name or ID.

describe_ingestion(ingestion_id[, ...])

Describe a QuickSight ingestion by ID.

get_dashboard_id(name[, account_id, ...])

Get QuickSight dashboard ID given a name and fails if there is more than 1 ID associated with this name.

get_dashboard_ids(name[, account_id, ...])

Get QuickSight dashboard IDs given a name.

get_data_source_arn(name[, account_id, ...])

Get QuickSight data source ARN given a name and fails if there is more than 1 ARN associated with this name.

get_data_source_arns(name[, account_id, ...])

Get QuickSight Data source ARNs given a name.

get_data_source_id(name[, account_id, ...])

Get QuickSight data source ID given a name and fails if there is more than 1 ID associated with this name.

get_data_source_ids(name[, account_id, ...])

Get QuickSight data source IDs given a name.

get_dataset_id(name[, account_id, boto3_session])

Get QuickSight Dataset ID given a name and fails if there is more than 1 ID associated with this name.

get_dataset_ids(name[, account_id, ...])

Get QuickSight dataset IDs given a name.

get_template_id(name[, account_id, ...])

Get QuickSight template ID given a name and fails if there is more than 1 ID associated with this name.

get_template_ids(name[, account_id, ...])

Get QuickSight template IDs given a name.

list_dashboards([account_id, boto3_session])

List dashboards in an AWS account.

list_data_sources([account_id, boto3_session])

List all QuickSight Data sources summaries.

list_datasets([account_id, boto3_session])

List all QuickSight datasets summaries.

list_groups([namespace, account_id, ...])

List all QuickSight Groups.

list_group_memberships(group_name[, ...])

List all QuickSight Group memberships.

list_iam_policy_assignments([status, ...])

List IAM policy assignments in the current Amazon QuickSight account.

list_iam_policy_assignments_for_user(user_name)

List all the IAM policy assignments.

list_ingestions([dataset_name, dataset_id, ...])

List the history of SPICE ingestions for a dataset.

list_templates([account_id, boto3_session])

List all QuickSight templates.

list_users([namespace, account_id, ...])

Return a list of all of the Amazon QuickSight users belonging to this account.

list_user_groups(user_name[, namespace, ...])

List the Amazon QuickSight groups that an Amazon QuickSight user is a member of.

AWS STS

get_account_id([boto3_session])

Get Account ID.

get_current_identity_arn([boto3_session])

Get current user/role ARN.

get_current_identity_name([boto3_session])

Get current user/role name.

AWS Secrets Manager

get_secret(name[, boto3_session])

Get secret value.

get_secret_json(name[, boto3_session])

Get JSON secret value.

Amazon Chime

post_message(webhook, message)

Send message on an existing Chime Chat rooms.

Global Configurations

reset([item])

Reset one or all (if None is received) configuration values.

to_pandas()

Load all configurations on a Pandas DataFrame.

https://d3tiqpr4kkkomd.cloudfront.net/img/pixel.png?asset=RIXAH6KDSYAI1HHEBLTY