diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fb2f1d6a5234..96ad3feddaccd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888)) - Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447) - Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889)) +- Add versioning support in pull-based ingestion ([#17918](https://github.com/opensearch-project/OpenSearch/pull/17918)) ### Changed - Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912)) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java index eb118c7bdbfce..e037b08cd4ef4 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java @@ -108,6 +108,19 @@ protected void produceData(String id, String name, String age, long timestamp, S producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload)); } + protected void produceDataWithExternalVersion(String id, long version, String name, String age, long timestamp, String opType) { + String payload = String.format( + Locale.ROOT, + "{\"_id\":\"%s\", \"_version\":\"%d\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}", + id, + version, + opType, + name, + age + ); + producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload)); + } + protected void produceData(String payload) { producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload)); } diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java index 54adeaa1396e5..4ab3bda474d10 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java @@ -19,7 +19,9 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.RangeQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.transport.client.Requests; @@ -310,6 +312,122 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup })); } + public void testExternalVersioning() throws Exception { + // setup nodes and index + produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index"); + produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index"); + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); + + createIndexWithDefaultSettings(1, 1); + ensureGreen(indexName); + waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB)); + + // validate next version docs get indexed + produceDataWithExternalVersion("1", 2, "name1", "30", defaultMessageTimestamp, "index"); + produceDataWithExternalVersion("2", 2, "name2", "30", defaultMessageTimestamp, "index"); + waitForState(() -> { + BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1)); + SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get(); + assertThat(response1.getHits().getTotalHits().value(), is(1L)); + BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2)); + SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get(); + assertThat(response2.getHits().getTotalHits().value(), is(1L)); + return 30 == (Integer) response1.getHits().getHits()[0].getSourceAsMap().get("age") + && 30 == (Integer) response2.getHits().getHits()[0].getSourceAsMap().get("age"); + }); + + // test out-of-order updates + produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index"); + produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index"); + produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index"); + waitForSearchableDocs(3, Arrays.asList(nodeA, nodeB)); + + BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1)); + SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get(); + assertThat(response1.getHits().getTotalHits().value(), is(1L)); + assertEquals(30, response1.getHits().getHits()[0].getSourceAsMap().get("age")); + + BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2)); + SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get(); + assertThat(response2.getHits().getTotalHits().value(), is(1L)); + assertEquals(30, response2.getHits().getHits()[0].getSourceAsMap().get("age")); + + // test deletes with smaller version + produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "delete"); + produceDataWithExternalVersion("4", 1, "name4", "25", defaultMessageTimestamp, "index"); + waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB)); + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(23); + SearchResponse response = client().prepareSearch(indexName).setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(4L)); + + // test deletes with correct version + produceDataWithExternalVersion("1", 3, "name1", "30", defaultMessageTimestamp, "delete"); + produceDataWithExternalVersion("2", 3, "name2", "30", defaultMessageTimestamp, "delete"); + waitForState(() -> { + RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23); + SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get(); + assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L)); + return true; + }); + } + + public void testExternalVersioningWithDisabledGCDeletes() throws Exception { + // setup nodes and index + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + final String nodeB = internalCluster().startDataOnlyNode(); + + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("index.replication.type", "SEGMENT") + .put("index.gc_deletes", "0") + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + // insert documents + produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index"); + produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index"); + waitForState(() -> { + RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23); + SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get(); + assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L)); + return true; + }); + + // delete documents 1 and 2 + produceDataWithExternalVersion("1", 2, "name1", "25", defaultMessageTimestamp, "delete"); + produceDataWithExternalVersion("2", 2, "name2", "25", defaultMessageTimestamp, "delete"); + produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index"); + waitForState(() -> { + BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 3)); + SearchResponse response = client().prepareSearch(indexName).setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(1L)); + return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age"); + }); + waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB)); + + // validate index operation with lower version creates new document + produceDataWithExternalVersion("1", 1, "name1", "35", defaultMessageTimestamp, "index"); + produceDataWithExternalVersion("4", 1, "name4", "35", defaultMessageTimestamp, "index"); + waitForState(() -> { + RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(34); + SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get(); + assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L)); + return true; + }); + + } + private void verifyRemoteStoreEnabled(String node) { GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get(); String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled"); diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index bd17ee2170121..f3e613621a053 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -16,10 +16,14 @@ import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IngestionSource; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.index.IngestionConsumerFactory; import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; +import org.opensearch.index.VersionType; import org.opensearch.index.mapper.DocumentMapperForType; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; @@ -47,6 +51,7 @@ import java.util.function.BiFunction; import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; +import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_LOCATION; import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT; /** @@ -157,15 +162,45 @@ public IndexResult index(Index index) throws IOException { /** * Indexes the document into the engine. This is used internally by the stream poller only. * @param index the index request - * @return the index result * @throws IOException if an error occurs */ - public IndexResult indexInternal(Index index) throws IOException { + public void indexInternal(Index index) throws IOException { + // todo: add number of inserts/updates metric assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field(); - ensureOpen(); - final IndexResult indexResult; - indexResult = indexIntoLucene(index); - return indexResult; + + try ( + ReleasableLock releasableLock1 = readLock.acquire(); + Releasable releasableLock2 = versionMap.acquireLock(index.uid().bytes()) + ) { + ensureOpen(); + lastWriteNanos = index.startTime(); + boolean isExternalVersioning = index.versionType() == VersionType.EXTERNAL; + if (index.getAutoGeneratedIdTimestamp() == UNSET_AUTO_GENERATED_TIMESTAMP) { + validateDocumentVersion(index); + } + + if (isExternalVersioning) { + index.parsedDoc().version().setLongValue(index.version()); + } + + IndexResult indexResult = indexIntoLucene(index); + if (isExternalVersioning && indexResult.getResultType() == Result.Type.SUCCESS) { + versionMap.maybePutIndexUnderLock( + index.uid().bytes(), + new IndexVersionValue(EMPTY_TRANSLOG_LOCATION, index.version(), index.seqNo(), index.primaryTerm()) + ); + } + } catch (VersionConflictEngineException e) { + logger.debug("Version conflict encountered when processing index operation", e); + throw e; + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } } private IndexResult indexIntoLucene(Index index) throws IOException { @@ -203,18 +238,56 @@ public DeleteResult delete(Delete delete) throws IOException { /** * Processes delete operations. This is used internally by the stream poller only. */ - public DeleteResult deleteInternal(Delete delete) throws IOException { + public void deleteInternal(Delete delete) throws IOException { + // todo: add number of deletes metric + versionMap.enforceSafeAccess(); assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field(); - ensureOpen(); - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id()); - assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; - final ParseContext.Document doc = tombstone.docs().get(0); - assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set [" - + doc - + " ]"; - doc.add(softDeletesField); - indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField); - return new DeleteResult(1, delete.primaryTerm(), -1, true); + lastWriteNanos = delete.startTime(); + + try ( + ReleasableLock releasableLock1 = readLock.acquire(); + Releasable releasableLock2 = versionMap.acquireLock(delete.uid().bytes()) + ) { + ensureOpen(); + validateDocumentVersion(delete); + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id()); + boolean isExternalVersioning = delete.versionType() == VersionType.EXTERNAL; + if (isExternalVersioning) { + tombstone.version().setLongValue(delete.version()); + } + + assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; + final ParseContext.Document doc = tombstone.docs().get(0); + assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set [" + + doc + + " ]"; + doc.add(softDeletesField); + + indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField); + if (isExternalVersioning) { + versionMap.putDeleteUnderLock( + delete.uid().bytes(), + new DeleteVersionValue( + delete.version(), + delete.seqNo(), + delete.primaryTerm(), + engineConfig.getThreadPool().relativeTimeInMillis() + ) + ); + } + } catch (VersionConflictEngineException e) { + logger.debug("Version conflict encountered when processing deletes", e); + throw e; + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("delete", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } + + maybePruneDeletes(); } @Override @@ -229,6 +302,15 @@ public GetResult get(Get get, BiFunction search return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL); } + @Override + protected void pruneDeletedTombstones() { + final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); + final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis(); + // prune based only on timestamp and not sequence number + versionMap.pruneTombstones(maxTimestampToPrune, Long.MAX_VALUE); + lastDeleteVersionPruneTimeMSec = timeMSec; + } + @Override public Translog.Snapshot newChangesSnapshot( String source, @@ -381,6 +463,33 @@ private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy er streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy); } + /** + * Validates document version for pull-based ingestion. Only external versioning is supported. + */ + private void validateDocumentVersion(final Operation operation) throws IOException { + if (operation.versionType() != VersionType.EXTERNAL) { + return; + } + + versionMap.enforceSafeAccess(); + final VersionValue versionValue = resolveDocVersion(operation, false); + final long currentVersion; + final boolean currentNotFoundOrDeleted; + + if (versionValue == null) { + // todo: possible to optimize addDoc instead of updateDoc if version is not present? + currentVersion = Versions.NOT_FOUND; + currentNotFoundOrDeleted = true; + } else { + currentVersion = versionValue.version; + currentNotFoundOrDeleted = versionValue.isDelete(); + } + + if (operation.versionType().isVersionConflictForWrites(currentVersion, operation.version(), currentNotFoundOrDeleted)) { + throw new VersionConflictEngineException(shardId, operation, currentVersion, currentNotFoundOrDeleted); + } + } + /** * Pause the poller. Used by management flows. */ diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 7e171e3f1714c..e6baed6cd16b6 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -153,7 +153,7 @@ public class InternalEngine extends Engine { /** * When we last pruned expired tombstones from versionMap.deletes: */ - private volatile long lastDeleteVersionPruneTimeMSec; + protected volatile long lastDeleteVersionPruneTimeMSec; protected final TranslogManager translogManager; protected final IndexWriter indexWriter; @@ -163,6 +163,10 @@ public class InternalEngine extends Engine { protected final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false); protected final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField(); + // A uid (in the form of BytesRef) to the version map + // we use the hashed variant since we iterate over it and check removal and additions on existing keys + protected final LiveVersionMap versionMap = new LiveVersionMap(); + @Nullable protected final String historyUUID; @@ -173,10 +177,6 @@ public class InternalEngine extends Engine { private final Lock flushLock = new ReentrantLock(); private final ReentrantLock optimizeLock = new ReentrantLock(); - // A uid (in the form of BytesRef) to the version map - // we use the hashed variant since we iterate over it and check removal and additions on existing keys - private final LiveVersionMap versionMap = new LiveVersionMap(); - private volatile SegmentInfos lastCommittedSegmentInfos; private final IndexThrottle throttle; @@ -745,7 +745,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) } /** resolves the current version of the document, returning null if not found */ - private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) throws IOException { + protected VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) throws IOException { assert incrementVersionLookup(); // used for asserting in tests VersionValue versionValue = getVersionFromMap(op.uid().bytes()); if (versionValue == null) { @@ -1974,7 +1974,7 @@ private void refreshLastCommittedSegmentInfos() { } } - private void pruneDeletedTombstones() { + protected void pruneDeletedTombstones() { /* * We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary * are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java index c1d098279a7eb..b1d3004131291 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java @@ -29,11 +29,13 @@ import org.opensearch.index.VersionType; import org.opensearch.index.engine.Engine; import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.index.mapper.IdFieldMapper; import org.opensearch.index.mapper.ParseContext; import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.mapper.Uid; +import org.opensearch.index.mapper.VersionFieldMapper; import java.io.IOException; import java.util.Map; @@ -170,6 +172,15 @@ protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer po String opTypeString = (String) payloadMap.getOrDefault(OP_TYPE, "index"); DocWriteRequest.OpType opType = DocWriteRequest.OpType.fromString(opTypeString); + // Check message for document version. Pull-based ingestion only supports external versioning. + // By default, writes succeed regardless of document version. + long documentVersion = Versions.MATCH_ANY; + VersionType documentVersionType = VersionType.INTERNAL; + if (payloadMap.containsKey(VersionFieldMapper.NAME)) { + documentVersion = Long.parseLong((String) payloadMap.get(VersionFieldMapper.NAME)); + documentVersionType = VersionType.EXTERNAL; + } + Engine.Operation operation; switch (opType) { case INDEX: @@ -199,8 +210,8 @@ protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer po doc, 0, 1, - Versions.MATCH_ANY, - VersionType.INTERNAL, + documentVersion, + documentVersionType, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, @@ -225,8 +236,8 @@ protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer po new Term(IdFieldMapper.NAME, Uid.encodeId(id)), 0, 1, - Versions.MATCH_ANY, - VersionType.INTERNAL, + documentVersion, + documentVersionType, Engine.Operation.Origin.PRIMARY, System.nanoTime(), UNASSIGNED_SEQ_NO, @@ -282,6 +293,12 @@ public void run() { currentShardPointer = readResult.getPointer(); messageProcessor.process(readResult.getMessage(), readResult.getPointer()); readResult = null; + } catch (VersionConflictEngineException e) { + // Messages with version conflicts will be dropped. This should not have any impact to data + // correctness as pull-based ingestion does not support partial updates. + // TODO: add metric + logger.debug("Dropping message due to version conflict. ShardPointer: " + readResult.getPointer().asString(), e); + readResult = null; } catch (Exception e) { errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING); if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {