Description
Background
OpenSearch performs well for querying data for most use cases, but, there are scenarios where organizing specific data into dedicated segments can improve query performance by reducing latency and/or provide more accurate results:
- For instance, when analysing application logs, users are often more interested in identifying the anomalies (logs with errors or status code non 2XX) which can be stored separately on its own segments resulting in lower latency.
- Similarly, for vector search, OpenSearch distributes documents randomly across multiple shards. While this simplifies the document insertion, it makes it difficult to predict which shards contains the nearest neighbours for a given query. To perform a search, the query must be distributed to all shards/segments, and the responses from all shards must be collated before the nearest neighbours can be determined and returned to the user. This approach requires loading the entire HNSW graph for all the segments into memory leading to high memory usage (bigger instances). Also, this is a compute intensive process. While some optimizations exists, such as early termination of query for approximate results, they often sacrifice accuracy. Storing related vectors (belonging to the same tenant/category) together will ensure higher accuracy and better performance, since only relevant segments are queried.
- Storing related data together enables the pre-computation of aggregations for frequently executed queries (e.g., count, minimum, maximum, average, percentiles) and store them as separate metadata. Corresponding queries can be served from the metadata (pre-computed values) itself, thus optimizing both on the latency and compute.
- With the introduction of segment groups and the identification of less relevant data during ingestion, less relevant data can be directly stored in cheaper remote storage (e.g.: AWS S3, Google Cloud Storage, MinIO, etc.), while more relevant data can be kept in hot storage (on the node's disk). This strategy conserves memory and disk space on local storage without affecting latency.
- Compression works more effectively when related data are stored together, resulting in an improvement of approximately 19% in the size of indices
Current Issue with OpenSearch
Currently, in OpenSearch, assignment of documents within segment is purely concurrency based and it does not consider factors like anticipated query pattern while distributing data within segments. This leads to segments containing a mix of relevant and less relevant documents. Furthermore, if relevant documents are sparse, they will be thinly spread out even within the segments, necessitating the iteration over many less relevant documents for search queries. For example, OpenSearch does about 15x lesser iteration while running a sort query for 4xx status code logs within a day for http log workload if we group logs by status code within segments.
Query type (on http logs workload) | Documents scanned (current state) in comparison with proposed change |
---|---|
range (400 status code logs within a given time range) | 3.2x more |
asc_sort_timestamp (400 status code logs within a given time range sorted by timestamp) | 15.3x more |
asc_sort_with_after_timestamp (500 status code logs after a timestamp and sorted by timestamp) | 15.3x more |
desc_sort_timestamp (400 status code logs within a given time range sorted in desc order by timestamp) | 15.3x more |
desc_sort_with_after_timestamp (500 status code logs after a timestamp and sorted in desc order by timestamp) | 15.3x more |
hourly aggregations (Day histogram of logs with status codes 400, 403 and 404 with min and max sizes of request) | 40x more |
Segment Grouping
In order to ensure, only more relevant data is iterated during query execution, we suggest collocating related data into same segment or group of segments. Group of a segment can be determined by a grouping criteria function. The goal is to align segment boundaries with the anticipated query patterns, ensuring that documents frequently queried together resides in the same segments. For eg: For log analytics scenarios, users often queries for anomalies (4xx and 5xx status code logs) over success logs (2xx). By applying a grouping criteria function based on status code anomalies and success logs are segregated into distinct segments (or groups of segments). This will ensure that search queries like “number of faults in the last hour” or “number of errors in the last three hours” will be more efficient, as they will need to only process segments with 4xx or 5xx status codes, which will be a much smaller dataset, improving query performance.
Proposed Solution
The above can be achieved by keeping a pool of disposable, group-specific IndexWriter instances to ensure documents from the same logical group are co-located in the same segments. Mirroring Lucene’s DocumentsWriterPerThread (DWPT) model for document distribution within segments, this approach dynamically assigns each document to a transient IndexWriter from the pool based on its evaluation of grouping criteria function. Like DWPTs, these disposable IndexWriter instances are short lived: they buffer documents in memory, sync their data to a shared accumulating IndexWriter during refresh and are then closed.
Implementation Details
InternalEngine maintains a mapping of active IndexWriters, each associated with a specific group. During indexing, the specific IndexWriter selected for indexing a document will depend on the outcome of the document for the grouping criteria function. Should the relevant IndexWriter entry inside map is null, a new IndexWriter will be instantiated for this criteria and added to this map.
During reader refresh, Lucene will first flush the in memory documents of group level IndexWriters into segments, OpenSearch closes group level IndexWriters, removes these IndexWriters from active IndexWriters map and sync the data of group level IndexWriter with a common accumulating IndexWriter using Lucene’s addIndexes API call. This synchronisation ensures Reader consistency is maintained post refresh guaranteeing all the latest indexed documents are searchable post refresh. Also since new IndexWriters replaces older ones as soon as they are closed, there is no impact on overall Indexing throughput.
Indexing
Refresh
Search
Considerations
We will continue to maintain entity like Translog, SequenceNumber, LocalCheckpoint, GlobalCheckpoint, combined deletion policy, ReaderManager and IndexStats at shard level. MergeScheduler is kept separate for group level IndexWriter and parent IndexWriter as segment merges can happen independently for group level IndexWriter and parent IndexWriter.
Limitations
- Selecting the appropriate grouping criteria is crucial. Too small groups can increase the number of segments which can breach file handle limits and/or virtual memory address for OpenSearch process. Additionally, this can also lead to multiple small segments. Conversely, selecting too large group can regress query performance. Implementing a guardrail around grouping criteria can prevent excessive small or large grouping.
- For tenants with extremely large datasets, a hybrid strategy is recommended by splitting data across multiple shards while still organising related content into context aware segments within those shards.
- In the initial implementation, grouping criteria will be a user defined predicate. Future exploration could involve automatic criteria selection based on workload.
- User cannot update the field involved in evaluating grouping criteria.
How different flows will get impacted
1. Refresh
InternalEngine ensures document synchronization between group-level IndexWriters and the parent IndexWriter prior to each refresh through the following sequence of operations:
- Invalidate all the active IndexWriters so that none of these IndexWriters can be used for indexing any new documents.
- Mark the above IndexWriters for refresh.
- Flush the documents of all group level IndexWriters marked for refresh. While flushing, OpenSearch will attach a group specific segment attribute to identify segments by origin group.
- Close all group level IndexWriters marked for refresh.
- Sync the documents of all IndexWriters marked for refresh with the parent IndexWriter via IndexWriter addIndexes api call.
- Remove all mark for refresh IndexWriters from the refresh list.
A potential scenario which we need to handle with the above approach is incase IndexWriter gets closed during active document indexing which can cause indexing to fail. To handle this, we will maintain a map of ReentrantReadWriteLocks for each group level IndexWriters. During active indexing, we will obtain read lock on ReentrantReadWriteLock of the associated IndexWriter. While closing this group level IndexWriter, we obtain a write lock ensuring that no active indexing is going on this IndexWriter. There may be other edge cases which we will handle as we get to implementation details.
2. Flush
InternalEngine will sync parent IndexWriter with group level IndexWriters before performing a commit. LocalCheckpoint and max sequence number of the commit data associated with the parent IndexWriter will be set to the maximum value of checkpoint and sequence number associated with commit data of group level IndexWriters. This is required to ensure that max seq number and checkpoint is in sync with documents associated with parent IndexWriter at the time of commit.
3. Updates and Delete
Since write operations (insert/updates/deletes) are now not limited to a single IndexWriter, we need to handle scenarios when there is a cross writer updates or deletes (when update or delete operation is not on the same IndexWriter where it was inserted). We will follow up with more details around how to handle updates/deletes with this approach later.
4. Search
In the search workflow, ContextIndexSearcher will evaluate the relevant group for an OpenSearch search query to decide which segments it needs to query data from. For example, for log analytics workload, if the search query is a match query for 5xx status code logs, ContextIndexSearcher will only scan the segments which contains 5xx logs. ContextIndexSearcher will use segment attributes to determine the group associated with a segment.
5. Merge
We will introduce a new Context aware/based merge policy to ensure that group invariant is maintained during segment merges. For force merges, maximum number of segments will now be enforced at per group level instead of per shard level.
6. Recovery
DocRep
For DocRep, there will be no major change in the flow with context aware segments, as last safe IndexCommit will be captured at parent IndexWriter level similar to how it was taken earlier with a single IndexWriter. So during recovery of a replica shard from primary, OpenSearch will:
- Take a snapshot of last safe IndexCommit at the parent IndexWriter level for the primary shard.
- Send the metadata associated with the files (for missing segment files on target node) of this IndexCommit to the replica.
- Sends the actual associated files to the replica.
- Post this we associate replica node with a new translog UUID by opening a new parent IndexWriter on replica and commiting this translog UUID in its user data.
- Once last commit is transferred to replica, we replay all the operations from local checkpoint to max sequence number of primary on the replica shard.
SegRep
Similarly for SegRep, during replica recovery from primary:
- Replica node asks for a checkpoint metadata information from the primary shard.
- Primary shard returns following information of parent IndexWriter as checkpoint metadata:
- Latest segmentInfo snapshot which is obtained from ReaderManager created over parent IndexWriter.
- A ReplicationCheckpoint which contains shardId, primary term, segmentsGen, segmentInfosVersion and metadata for all Segment files (name, length, checksum, hash) associated with the segmentInfos of parent IndexWriter.
- Using the above checkpoint, Replica shard computes which files it needs to fetch from primary.
- Once the files are copied, Replica shard reads latest segment_N file to create SegmentInfos, post which we update ReaderManger’s SegmentInfos.
- If incoming generation of SegmentInfos does not match last generation of SegmentInfos which was passed to replica, we perform a commit.
7. Snapshot
Snapshot flow is almost similar to recovery. During snapshot, OpenSearch first obtains the last commit for each shard for the parent IndexWriter. Post this, it obtains all the file names associated with this commit and uploads the files to remote repository one by one. Similarly for restore, OpenSearch fetches the snapshot from the remote repository, get the associated file name with this snapshot and fetch the missing files from the remote repository.
Alternatives Explored
- The initial approach for segregating documents belonging to different groups in different segments within a shard involved DocumentWriter assigning different DWPT to documents with different predicate evaluation (with different groups). This was achieved by adding support for DWPT selection mechanism based on a specific criteria (passed as an Index configuration). During indexing, this grouping criteria function would be evaluated for each document, ensuring that documents with differing criteria are indexed using separate DWPTs. A new segment merge policy will ensure segments corresponding to same criteria are merged together, maintaining this invariant during segment merges. However this approach required few complex changes modifications to Lucene’s core.
- An alternative suggestion was to have different shard for different groups and query data over it as if were a single shard using a MultiReader. However this approach has following concern:
- This could lead to explosion of shards. Total number of shards required will be (number of groups * number of shards). This will in turn cause huge load on the cluster manager node to manage these shards on the data nodes of the cluster.
- Another issue with this approach is skewness. For example, with status code based grouping, 5XX and 4XX shard groups will receive more (search) traffic than shards containing 2XX documents. This will cause skewness on the node resulting in hot shards and nodes.
- Third approach was to maintain separate Lucene indices for each group within a single OpenSearch shard. Documents are routed to respective Lucene indices based on their evaluated grouping criteria. In order to ensure that an OpenSearch shard still interacts with a single Lucene entity, we proposed extending MultiReader to act a combined view over multiple group level Lucene indexes. So within a shard, even though writes continue to be routed to group level Lucene Indexes, combined view ensures that they still behave as a single logical entity (or OpenSearch shard continues to interact with a single logical Lucene Index) by flattening and combining different read attributes of group level Lucene indices (like segmentInfos, IndexCommit, etc). However this approach has following concern:
- Maintaining a combined view of group level SegmentInfos, IndexCommit maintaining an additional mapping of their attributes as well as version and generation. So a single generation of parent SegmentInfos will be mapped to a set of respective group level SegmentInfos.
- This mapping of combined view with group level view needs to be persisted in disk so that bouncing node does not clear this mapping.
- All Segments referenced by a combined view should not be deleted. This require us to create a separate CombinedDeletion policy for this combined view to delete stale segments.
- Another issue with this approach is group level view (segmentInfos and IndexCommit of group level Lucene Indices) is still required for few flows like recovery. For example, in case a replica shard is recovering using a primary, we still require to send group level safe IndexCommits (in case of DocRep) or group level SegmentInfos (in case of SegRep) from primary to replica to allow initialisation of group level Lucene indices.
Related Discussion Issues
OpenSearch: #13183
Lucene: apache/lucene#13387
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
Status