awswrangler.timestream.write

awswrangler.timestream.write(df: DataFrame, database: str, table: str, time_col: str | None = None, measure_col: str | list[str | None] | None = None, dimensions_cols: list[str | None] | None = None, version: int = 1, time_unit: Literal['MILLISECONDS', 'SECONDS', 'MICROSECONDS', 'NANOSECONDS'] = 'MILLISECONDS', use_threads: bool | int = True, measure_name: str | None = None, common_attributes: dict[str, Any] | None = None, boto3_session: Session | None = None) list[dict[str, str]]

Store a Pandas DataFrame into an Amazon Timestream table.

Note

In case use_threads=True, the number of threads from os.cpu_count() is used.

If the Timestream service rejects a record(s), this function will not throw a Python exception. Instead it will return the rejection information.

Note

If time_col column is supplied, it must be of type timestamp. time_unit is set to MILLISECONDS by default. NANOSECONDS is not supported as python datetime objects are limited to microseconds precision.

Parameters:
  • df (DataFrame) – Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html

  • database (str) – Amazon Timestream database name.

  • table (str) – Amazon Timestream table name.

  • time_col (str | None) – DataFrame column name to be used as time. MUST be a timestamp column.

  • measure_col (str | list[str | None] | None) – DataFrame column name(s) to be used as measure.

  • dimensions_cols (list[str | None] | None) – List of DataFrame column names to be used as dimensions.

  • version (int) – Version number used for upserts. Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.

  • time_unit (Literal['MILLISECONDS', 'SECONDS', 'MICROSECONDS', 'NANOSECONDS']) – Time unit for the time column. MILLISECONDS by default.

  • use_threads (bool | int) – True to enable concurrent writing, False to disable multiple threads. If enabled, os.cpu_count() is used as the number of threads. If integer is provided, specified number is used.

  • measure_name (str | None) – Name that represents the data attribute of the time series. Overrides measure_col if specified.

  • common_attributes (dict[str, Any] | None) – Dictionary of attributes shared across all records in the request. Using common attributes can optimize the cost of writes by reducing the size of request payloads. Values in common_attributes take precedence over all other arguments and data frame values. Dimension attributes are merged with attributes in record objects. Example: {"Dimensions": [{"Name": "device_id", "Value": "12345"}], "MeasureValueType": "DOUBLE"}.

  • boto3_session (Session | None) – The default boto3 session will be used if boto3_session is None.

Return type:

list[dict[str, str]]

Returns:

Rejected records. Possible reasons for rejection are described here: https://docs.aws.amazon.com/timestream/latest/developerguide/API_RejectedRecord.html

Examples

Store a Pandas DataFrame into a Amazon Timestream table.

>>> import awswrangler as wr
>>> import pandas as pd
>>> df = pd.DataFrame(
>>>     {
>>>         "time": [datetime.now(), datetime.now(), datetime.now()],
>>>         "dim0": ["foo", "boo", "bar"],
>>>         "dim1": [1, 2, 3],
>>>         "measure": [1.0, 1.1, 1.2],
>>>     }
>>> )
>>> rejected_records = wr.timestream.write(
>>>     df=df,
>>>     database="sampleDB",
>>>     table="sampleTable",
>>>     time_col="time",
>>>     measure_col="measure",
>>>     dimensions_cols=["dim0", "dim1"],
>>> )
>>> assert len(rejected_records) == 0

Return value if some records are rejected.

>>> [
>>>     {
>>>         'ExistingVersion': 2,
>>>         'Reason': 'The record version 1 is lower than the existing version 2. A '
>>>                   'higher version is required to update the measure value.',
>>>         'RecordIndex': 0
>>>     }
>>> ]