awswrangler.timestream.write¶
- awswrangler.timestream.write(df: DataFrame, database: str, table: str, time_col: str | None = None, measure_col: str | List[str | None] | None = None, dimensions_cols: List[str | None] | None = None, version: int = 1, time_unit: Literal['MILLISECONDS', 'SECONDS', 'MICROSECONDS', 'NANOSECONDS'] = 'MILLISECONDS', use_threads: bool | int = True, measure_name: str | None = None, common_attributes: Dict[str, Any] | None = None, boto3_session: Session | None = None) List[Dict[str, str]]¶
Store a Pandas DataFrame into an Amazon Timestream table.
Note
In case use_threads=True, the number of threads from os.cpu_count() is used.
If the Timestream service rejects a record(s), this function will not throw a Python exception. Instead it will return the rejection information.
Note
If
time_colcolumn is supplied, it must be of type timestamp.time_unitis set to MILLISECONDS by default. NANOSECONDS is not supported as python datetime objects are limited to microseconds precision.Note
Following arguments are not supported in distributed mode with engine EngineEnum.RAY:
boto3_session
- Parameters:
df (pandas.DataFrame) – Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html
database (str) – Amazon Timestream database name.
table (str) – Amazon Timestream table name.
time_col (Optional[str]) – DataFrame column name to be used as time. MUST be a timestamp column.
measure_col (Union[str, List[str], None]) – DataFrame column name(s) to be used as measure.
dimensions_cols (Optional[List[str]]) – List of DataFrame column names to be used as dimensions.
version (int) – Version number used for upserts. Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.
time_unit (str, optional) – Time unit for the time column. MILLISECONDS by default.
use_threads (bool, int) – True to enable concurrent writing, False to disable multiple threads. If enabled, os.cpu_count() is used as the number of threads. If integer is provided, specified number is used.
measure_name (Optional[str]) – Name that represents the data attribute of the time series. Overrides
measure_colif specified.common_attributes (Optional[Dict[str, Any]]) – Dictionary of attributes shared across all records in the request. Using common attributes can optimize the cost of writes by reducing the size of request payloads. Values in
common_attributestake precedence over all other arguments and data frame values. Dimension attributes are merged with attributes in record objects. Example:{"Dimensions": [{"Name": "device_id", "Value": "12345"}], "MeasureValueType": "DOUBLE"}.boto3_session (boto3.Session(), optional) – Boto3 Session. If None, the default boto3 Session is used.
- Returns:
Rejected records. Possible reasons for rejection are described here: https://docs.aws.amazon.com/timestream/latest/developerguide/API_RejectedRecord.html
- Return type:
List[Dict[str, str]]
Examples
Store a Pandas DataFrame into a Amazon Timestream table.
>>> import awswrangler as wr >>> import pandas as pd >>> df = pd.DataFrame( >>> { >>> "time": [datetime.now(), datetime.now(), datetime.now()], >>> "dim0": ["foo", "boo", "bar"], >>> "dim1": [1, 2, 3], >>> "measure": [1.0, 1.1, 1.2], >>> } >>> ) >>> rejected_records = wr.timestream.write( >>> df=df, >>> database="sampleDB", >>> table="sampleTable", >>> time_col="time", >>> measure_col="measure", >>> dimensions_cols=["dim0", "dim1"], >>> ) >>> assert len(rejected_records) == 0
Return value if some records are rejected.
>>> [ >>> { >>> 'ExistingVersion': 2, >>> 'Reason': 'The record version 1 is lower than the existing version 2. A ' >>> 'higher version is required to update the measure value.', >>> 'RecordIndex': 0 >>> } >>> ]