AWS SDK for pandas

11 - CSV Datasets

awswrangler has 3 different write modes to store CSV Datasets on Amazon S3.

  • append (Default)

    Only adds new files without any delete.

  • overwrite

    Deletes everything in the target directory and then add new files.

  • overwrite_partitions (Partition Upsert)

    Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.

[1]:
from datetime import date

import pandas as pd

import awswrangler as wr

Enter your bucket name:

[2]:
import getpass

bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
 ············

Checking/Creating Glue Catalog Databases

[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

Creating the Dataset

[4]:
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"], "date": [date(2020, 1, 1), date(2020, 1, 2)]})

wr.s3.to_csv(
    df=df, path=path, index=False, dataset=True, mode="overwrite", database="awswrangler_test", table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[4]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02

Appending

[5]:
df = pd.DataFrame({"id": [3], "value": ["bar"], "date": [date(2020, 1, 3)]})

wr.s3.to_csv(
    df=df, path=path, index=False, dataset=True, mode="append", database="awswrangler_test", table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[5]:
id value date
0 3 bar 2020-01-03
1 1 foo 2020-01-01
2 2 boo 2020-01-02

Overwriting

[6]:
wr.s3.to_csv(
    df=df, path=path, index=False, dataset=True, mode="overwrite", database="awswrangler_test", table="csv_dataset"
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[6]:
id value date
0 3 bar 2020-01-03

Creating a Partitioned Dataset

[7]:
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"], "date": [date(2020, 1, 1), date(2020, 1, 2)]})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="csv_dataset",
    partition_cols=["date"],
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[7]:
id value date
0 2 boo 2020-01-02
1 1 foo 2020-01-01

Upserting partitions (overwrite_partitions)

[8]:
df = pd.DataFrame({"id": [2, 3], "value": ["xoo", "bar"], "date": [date(2020, 1, 2), date(2020, 1, 3)]})

wr.s3.to_csv(
    df=df,
    path=path,
    index=False,
    dataset=True,
    mode="overwrite_partitions",
    database="awswrangler_test",
    table="csv_dataset",
    partition_cols=["date"],
)

wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[8]:
id value date
0 1 foo 2020-01-01
1 2 xoo 2020-01-02
0 3 bar 2020-01-03

BONUS - Glue/Athena integration

[9]:
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"], "date": [date(2020, 1, 1), date(2020, 1, 2)]})

wr.s3.to_csv(
    df=df,
    path=path,
    dataset=True,
    index=False,
    mode="overwrite",
    database="aws_sdk_pandas",
    table="my_table",
    compression="gzip",
)

wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_sdk_pandas")
[9]:
id value date
0 1 foo 2020-01-01
1 2 boo 2020-01-02