26 - Amazon Timestream¶
Creating resources¶
[10]:
from datetime import datetime
import pandas as pd
import awswrangler as wr
database = "sampleDB"
table_1 = "sampleTable1"
table_2 = "sampleTable2"
wr.timestream.create_database(database)
wr.timestream.create_table(database, table_1, memory_retention_hours=1, magnetic_retention_days=1)
wr.timestream.create_table(database, table_2, memory_retention_hours=1, magnetic_retention_days=1)
Write¶
Single measure WriteRecord¶
[11]:
df = pd.DataFrame(
{
"time": [datetime.now()] * 3,
"dim0": ["foo", "boo", "bar"],
"dim1": [1, 2, 3],
"measure": [1.0, 1.1, 1.2],
}
)
rejected_records = wr.timestream.write(
df=df,
database=database,
table=table_1,
time_col="time",
measure_col="measure",
dimensions_cols=["dim0", "dim1"],
)
print(f"Number of rejected records: {len(rejected_records)}")
Number of rejected records: 0
Multi measure WriteRecord¶
[ ]:
df = pd.DataFrame(
{
"time": [datetime.now()] * 3,
"measure_1": ["10", "20", "30"],
"measure_2": ["100", "200", "300"],
"measure_3": ["1000", "2000", "3000"],
"tag": ["tag123", "tag456", "tag789"],
}
)
rejected_records = wr.timestream.write(
df=df,
database=database,
table=table_2,
time_col="time",
measure_col=["measure_1", "measure_2", "measure_3"],
dimensions_cols=["tag"],
)
print(f"Number of rejected records: {len(rejected_records)}")
Query¶
[12]:
wr.timestream.query(
f'SELECT time, measure_value::double, dim0, dim1 FROM "{database}"."{table_1}" ORDER BY time DESC LIMIT 3'
)
[12]:
time | measure_value::double | dim0 | dim1 | |
---|---|---|---|---|
0 | 2020-12-08 19:15:32.468 | 1.0 | foo | 1 |
1 | 2020-12-08 19:15:32.468 | 1.2 | bar | 3 |
2 | 2020-12-08 19:15:32.468 | 1.1 | boo | 2 |
Unload¶
[ ]:
df = wr.timestream.unload(
sql=f'SELECT time, measure_value, dim0, dim1 FROM "{database}"."{table_1}"',
path="s3://bucket/extracted_parquet_files/",
partition_cols=["dim1"],
)
Deleting resources¶
[13]:
wr.timestream.delete_table(database, table_1)
wr.timestream.delete_table(database, table_2)
wr.timestream.delete_database(database)