AWS SDK for pandas

26 - Amazon Timestream

Creating resources

[10]:
from datetime import datetime

import pandas as pd

import awswrangler as wr

database = "sampleDB"
table_1 = "sampleTable1"
table_2 = "sampleTable2"
wr.timestream.create_database(database)
wr.timestream.create_table(database, table_1, memory_retention_hours=1, magnetic_retention_days=1)
wr.timestream.create_table(database, table_2, memory_retention_hours=1, magnetic_retention_days=1)

Write

Single measure WriteRecord

[11]:
df = pd.DataFrame(
    {
        "time": [datetime.now()] * 3,
        "dim0": ["foo", "boo", "bar"],
        "dim1": [1, 2, 3],
        "measure": [1.0, 1.1, 1.2],
    }
)

rejected_records = wr.timestream.write(
    df=df,
    database=database,
    table=table_1,
    time_col="time",
    measure_col="measure",
    dimensions_cols=["dim0", "dim1"],
)

print(f"Number of rejected records: {len(rejected_records)}")
Number of rejected records: 0

Multi measure WriteRecord

[ ]:
df = pd.DataFrame(
    {
        "time": [datetime.now()] * 3,
        "measure_1": ["10", "20", "30"],
        "measure_2": ["100", "200", "300"],
        "measure_3": ["1000", "2000", "3000"],
        "tag": ["tag123", "tag456", "tag789"],
    }
)
rejected_records = wr.timestream.write(
    df=df,
    database=database,
    table=table_2,
    time_col="time",
    measure_col=["measure_1", "measure_2", "measure_3"],
    dimensions_cols=["tag"],
)

print(f"Number of rejected records: {len(rejected_records)}")

Query

[12]:
wr.timestream.query(
    f'SELECT time, measure_value::double, dim0, dim1 FROM "{database}"."{table_1}" ORDER BY time DESC LIMIT 3'
)
[12]:
time measure_value::double dim0 dim1
0 2020-12-08 19:15:32.468 1.0 foo 1
1 2020-12-08 19:15:32.468 1.2 bar 3
2 2020-12-08 19:15:32.468 1.1 boo 2

Unload

[ ]:
df = wr.timestream.unload(
    sql=f'SELECT time, measure_value, dim0, dim1 FROM "{database}"."{table_1}"',
    path="s3://bucket/extracted_parquet_files/",
    partition_cols=["dim1"],
)

Deleting resources

[13]:
wr.timestream.delete_table(database, table_1)
wr.timestream.delete_table(database, table_2)
wr.timestream.delete_database(database)