12 - CSV Crawler¶
awswrangler can extract only the metadata from a Pandas DataFrame and then add it can be added to Glue Catalog as a table.
[1]:
from datetime import datetime
import pandas as pd
import awswrangler as wr
Enter your bucket name:¶
[2]:
import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/csv_crawler/"
············
Creating a Pandas DataFrame¶
[3]:
ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") # noqa
dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date() # noqa
df = pd.DataFrame(
{
"id": [1, 2, 3],
"string": ["foo", None, "boo"],
"float": [1.0, None, 2.0],
"date": [dt("2020-01-01"), None, dt("2020-01-02")],
"timestamp": [ts("2020-01-01 00:00:00.0"), None, ts("2020-01-02 00:00:01.0")],
"bool": [True, None, False],
"par0": [1, 1, 2],
"par1": ["a", "b", "b"],
}
)
df
[3]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | foo | 1.0 | 2020-01-01 | 2020-01-01 00:00:00 | True | 1 | a |
1 | 2 | None | NaN | None | NaT | None | 1 | b |
2 | 3 | boo | 2.0 | 2020-01-02 | 2020-01-02 00:00:01 | False | 2 | b |
Extracting the metadata¶
[4]:
columns_types, partitions_types = wr.catalog.extract_athena_types(
df=df, file_format="csv", index=False, partition_cols=["par0", "par1"]
)
[5]:
columns_types
[5]:
{'id': 'bigint',
'string': 'string',
'float': 'double',
'date': 'date',
'timestamp': 'timestamp',
'bool': 'boolean'}
[6]:
partitions_types
[6]:
{'par0': 'bigint', 'par1': 'string'}
Creating the table¶
[7]:
wr.catalog.create_csv_table(
table="csv_crawler",
database="awswrangler_test",
path=path,
partitions_types=partitions_types,
columns_types=columns_types,
)
Checking¶
[8]:
wr.catalog.table(database="awswrangler_test", table="csv_crawler")
[8]:
Column Name | Type | Partition | Comment | |
---|---|---|---|---|
0 | id | bigint | False | |
1 | string | string | False | |
2 | float | double | False | |
3 | date | date | False | |
4 | timestamp | timestamp | False | |
5 | bool | boolean | False | |
6 | par0 | bigint | True | |
7 | par1 | string | True |
We can still using the extracted metadata to ensure all data types consistence to new data¶
[9]:
df = pd.DataFrame(
{
"id": [1],
"string": ["1"],
"float": [1],
"date": [ts("2020-01-01 00:00:00.0")],
"timestamp": [dt("2020-01-02")],
"bool": [1],
"par0": [1],
"par1": ["a"],
}
)
df
[9]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 1 | 2020-01-01 | 2020-01-02 | 1 | 1 | a |
[10]:
res = wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
database="awswrangler_test",
table="csv_crawler",
partition_cols=["par0", "par1"],
dtype=columns_types,
)
You can also extract the metadata directly from the Catalog if you want¶
[11]:
dtype = wr.catalog.get_table_types(database="awswrangler_test", table="csv_crawler")
[12]:
res = wr.s3.to_csv(
df=df,
path=path,
index=False,
dataset=True,
database="awswrangler_test",
table="csv_crawler",
partition_cols=["par0", "par1"],
dtype=dtype,
)
Checking out¶
[13]:
df = wr.athena.read_sql_table(database="awswrangler_test", table="csv_crawler")
df
[13]:
id | string | float | date | timestamp | bool | par0 | par1 | |
---|---|---|---|---|---|---|---|---|
0 | 1 | 1 | 1.0 | None | 2020-01-02 | True | 1 | a |
1 | 1 | 1 | 1.0 | None | 2020-01-02 | True | 1 | a |
[14]:
df.dtypes
[14]:
id Int64
string string
float float64
date object
timestamp datetime64[ns]
bool boolean
par0 Int64
par1 string
dtype: object
Cleaning Up S3¶
[15]:
wr.s3.delete_objects(path)
Cleaning Up the Database¶
[16]:
wr.catalog.delete_table_if_exists(database="awswrangler_test", table="csv_crawler")
[16]:
True