awswrangler.timestream.write¶
- awswrangler.timestream.write(df: DataFrame, database: str, table: str, time_col: str | None = None, measure_col: str | list[str | None] | None = None, dimensions_cols: list[str | None] | None = None, version: int = 1, time_unit: Literal['MILLISECONDS', 'SECONDS', 'MICROSECONDS', 'NANOSECONDS'] = 'MILLISECONDS', use_threads: bool | int = True, measure_name: str | None = None, common_attributes: dict[str, Any] | None = None, boto3_session: Session | None = None) list[dict[str, str]] ¶
Store a Pandas DataFrame into an Amazon Timestream table.
Note
In case use_threads=True, the number of threads from os.cpu_count() is used.
If the Timestream service rejects a record(s), this function will not throw a Python exception. Instead it will return the rejection information.
Note
If
time_col
column is supplied, it must be of type timestamp.time_unit
is set to MILLISECONDS by default. NANOSECONDS is not supported as python datetime objects are limited to microseconds precision.Note
Following arguments are not supported in distributed mode with engine EngineEnum.RAY:
boto3_session
- Parameters:
df (
DataFrame
) – Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.htmldatabase (
str
) – Amazon Timestream database name.table (
str
) – Amazon Timestream table name.time_col (
str
|None
) – DataFrame column name to be used as time. MUST be a timestamp column.measure_col (
str
|list
[str
|None
] |None
) – DataFrame column name(s) to be used as measure.dimensions_cols (
list
[str
|None
] |None
) – List of DataFrame column names to be used as dimensions.version (
int
) – Version number used for upserts. Documentation https://docs.aws.amazon.com/timestream/latest/developerguide/API_WriteRecords.html.time_unit (
Literal
['MILLISECONDS'
,'SECONDS'
,'MICROSECONDS'
,'NANOSECONDS'
]) – Time unit for the time column. MILLISECONDS by default.use_threads (
bool
|int
) – True to enable concurrent writing, False to disable multiple threads. If enabled, os.cpu_count() is used as the number of threads. If integer is provided, specified number is used.measure_name (
str
|None
) – Name that represents the data attribute of the time series. Overridesmeasure_col
if specified.common_attributes (
dict
[str
,Any
] |None
) – Dictionary of attributes shared across all records in the request. Using common attributes can optimize the cost of writes by reducing the size of request payloads. Values incommon_attributes
take precedence over all other arguments and data frame values. Dimension attributes are merged with attributes in record objects. Example:{"Dimensions": [{"Name": "device_id", "Value": "12345"}], "MeasureValueType": "DOUBLE"}
.boto3_session (
Session
|None
) – The default boto3 session will be used if boto3_session isNone
.
- Return type:
list
[dict
[str
,str
]]- Returns:
Rejected records. Possible reasons for rejection are described here: https://docs.aws.amazon.com/timestream/latest/developerguide/API_RejectedRecord.html
Examples
Store a Pandas DataFrame into a Amazon Timestream table.
>>> import awswrangler as wr >>> import pandas as pd >>> df = pd.DataFrame( >>> { >>> "time": [datetime.now(), datetime.now(), datetime.now()], >>> "dim0": ["foo", "boo", "bar"], >>> "dim1": [1, 2, 3], >>> "measure": [1.0, 1.1, 1.2], >>> } >>> ) >>> rejected_records = wr.timestream.write( >>> df=df, >>> database="sampleDB", >>> table="sampleTable", >>> time_col="time", >>> measure_col="measure", >>> dimensions_cols=["dim0", "dim1"], >>> ) >>> assert len(rejected_records) == 0
Return value if some records are rejected.
>>> [ >>> { >>> 'ExistingVersion': 2, >>> 'Reason': 'The record version 1 is lower than the existing version 2. A ' >>> 'higher version is required to update the measure value.', >>> 'RecordIndex': 0 >>> } >>> ]