awswrangler.opensearch.index_documents¶
- awswrangler.opensearch.index_documents(client: opensearchpy.OpenSearch, documents: Iterable[Mapping[str, Any]], index: str, doc_type: str | None = None, keys_to_write: list[str] | None = None, id_keys: list[str] | None = None, ignore_status: list[Any] | tuple[Any] | None = None, bulk_size: int = 1000, chunk_size: int | None = 500, max_chunk_bytes: int | None = 104857600, max_retries: int | None = None, initial_backoff: int | None = None, max_backoff: int | None = None, use_threads: bool | int = False, enable_refresh_interval: bool = True, **kwargs: Any) dict[str, Any] ¶
Index all documents to OpenSearch index.
Note
max_retries, initial_backoff, and max_backoff are not supported with parallel bulk (when use_threads is set to True).
Note
Some of the args are referenced from opensearch-py client library (bulk helpers) https://opensearch-py.readthedocs.io/en/latest/helpers.html#opensearchpy.helpers.bulk https://opensearch-py.readthedocs.io/en/latest/helpers.html#opensearchpy.helpers.streaming_bulk
If you receive Error 429 (Too Many Requests) /_bulk please to to decrease bulk_size value. Please also consider modifying the cluster size and instance type - Read more here: https://aws.amazon.com/premiumsupport/knowledge-center/resolve-429-error-es/
- Parameters:
client (
OpenSearch
) – instance of opensearchpy.OpenSearch to use.documents (
Iterable
[Mapping
[str
,Any
]]) – List which contains the documents that will be inserted.index (
str
) – Name of the index.doc_type (
str
|None
) – Name of the document type (for Elasticsearch versions 5.x and earlier).keys_to_write (
list
[str
] |None
) – list of keys to index. If not provided all keys will be indexedid_keys (
list
[str
] |None
) – list of keys that compound document unique id. If not provided will use _id key if exists, otherwise will generate unique identifier for each document.ignore_status (
list
[Any
] |tuple
[Any
] |None
) – list of HTTP status codes that you want to ignore (not raising an exception)bulk_size (
int
) – number of docs in each _bulk request (default: 1000)chunk_size (
int
|None
) – number of docs in one chunk sent to es (default: 500)max_chunk_bytes (
int
|None
) – the maximum size of the request in bytes (default: 100MB)max_retries (
int
|None
) – maximum number of times a document will be retried when429
is received, set to 0 (default) for no retries on429
(default: 2)initial_backoff (
int
|None
) – number of seconds we should wait before the first retry. Any subsequent retries will be powers ofinitial_backoff*2**retry_number
(default: 2)max_backoff (
int
|None
) – maximum number of seconds a retry will wait (default: 600)use_threads (
bool
|int
) – True to enable concurrent requests, False to disable multiple threads. If enabled os.cpu_count() will be used as the max number of threads. If integer is provided, specified number is used.enable_refresh_interval (
bool
) – True (default) to enablerefresh_interval
modification to-1
(disabled) while indexing documents**kwargs (
Any
) – KEYWORD arguments forwarded to bulk operation elasticsearch >= 7.10.2 / opensearch: https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#url-parameters elasticsearch < 7.10.2: https://opendistro.github.io/for-elasticsearch-docs/docs/elasticsearch/rest-api-reference/#url-parameters
- Return type:
dict
[str
,Any
]- Returns:
Response payload https://opensearch.org/docs/opensearch/rest-api/document-apis/bulk/#response.
Examples
Writing documents
>>> import awswrangler as wr >>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT') >>> wr.opensearch.index_documents( ... documents=[{'_id': '1', 'value': 'foo'}, {'_id': '2', 'value': 'bar'}], ... index='sample-index1' ... )