11 - CSV Datasets¶
awswrangler has 3 different write modes to store CSV Datasets on Amazon S3.
append (Default)
Only adds new files without any delete.
overwrite
Deletes everything in the target directory and then add new files.
overwrite_partitions (Partition Upsert)
Only deletes the paths of partitions that should be updated and then writes the new partitions files. It’s like a “partition Upsert”.
[1]:
from datetime import date
import pandas as pd
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/dataset/"
············
Checking/Creating Glue Catalog Databases¶
[3]:
if "awswrangler_test" not in wr.catalog.databases().values:
wr.catalog.create_database("awswrangler_test")
Creating the Dataset¶
[4]:
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"], "date": [date(2020, 1, 1), date(2020, 1, 2)]})
wr.s3.to_csv(
df=df, path=path, index=False, dataset=True, mode="overwrite", database="awswrangler_test", table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[4]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |
Appending¶
[5]:
df = pd.DataFrame({"id": [3], "value": ["bar"], "date": [date(2020, 1, 3)]})
wr.s3.to_csv(
df=df, path=path, index=False, dataset=True, mode="append", database="awswrangler_test", table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[5]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
1 | 1 | foo | 2020-01-01 |
2 | 2 | boo | 2020-01-02 |
Overwriting¶
[6]:
wr.s3.to_csv(
df=df, path=path, index=False, dataset=True, mode="overwrite", database="awswrangler_test", table="csv_dataset"
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[6]:
id | value | date | |
---|---|---|---|
0 | 3 | bar | 2020-01-03 |
Creating a Partitioned Dataset¶
[7]:
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"], "date": [date(2020, 1, 1), date(2020, 1, 2)]})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite",
database="awswrangler_test",
table="csv_dataset",
partition_cols=["date"],
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[7]:
id | value | date | |
---|---|---|---|
0 | 2 | boo | 2020-01-02 |
1 | 1 | foo | 2020-01-01 |
Upserting partitions (overwrite_partitions)¶
[8]:
df = pd.DataFrame({"id": [2, 3], "value": ["xoo", "bar"], "date": [date(2020, 1, 2), date(2020, 1, 3)]})
wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
mode="overwrite_partitions",
database="awswrangler_test",
table="csv_dataset",
partition_cols=["date"],
)
wr.athena.read_sql_table(database="awswrangler_test", table="csv_dataset")
[8]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | xoo | 2020-01-02 |
0 | 3 | bar | 2020-01-03 |
BONUS - Glue/Athena integration¶
[9]:
df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"], "date": [date(2020, 1, 1), date(2020, 1, 2)]})
wr.s3.to_csv(
df=df,
path=path,
dataset=True,
index=False,
mode="overwrite",
database="aws_sdk_pandas",
table="my_table",
compression="gzip",
)
wr.athena.read_sql_query("SELECT * FROM my_table", database="aws_sdk_pandas")
[9]:
id | value | date | |
---|---|---|---|
0 | 1 | foo | 2020-01-01 |
1 | 2 | boo | 2020-01-02 |