Skip to content

Commit 3b1cee7

Browse files
Support versioning in pull-based ingestion
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 4308f4c commit 3b1cee7

File tree

5 files changed

+278
-26
lines changed

5 files changed

+278
-26
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ protected void produceData(String id, String name, String age, long timestamp, S
108108
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
109109
}
110110

111+
protected void produceDataWithExternalVersion(String id, long version, String name, String age, long timestamp, String opType) {
112+
String payload = String.format(
113+
Locale.ROOT,
114+
"{\"_id\":\"%s\", \"_version\":\"%d\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
115+
id,
116+
version,
117+
opType,
118+
name,
119+
age
120+
);
121+
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
122+
}
123+
111124
protected void produceData(String payload) {
112125
producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload));
113126
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.opensearch.cluster.metadata.IndexMetadata;
2020
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
2121
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.index.query.BoolQueryBuilder;
2223
import org.opensearch.index.query.RangeQueryBuilder;
24+
import org.opensearch.index.query.TermQueryBuilder;
2325
import org.opensearch.test.InternalTestCluster;
2426
import org.opensearch.test.OpenSearchIntegTestCase;
2527
import org.opensearch.transport.client.Requests;
@@ -310,6 +312,123 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
310312
}));
311313
}
312314

315+
public void testExternalVersioning() throws Exception {
316+
// setup nodes and index
317+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
318+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
319+
internalCluster().startClusterManagerOnlyNode();
320+
final String nodeA = internalCluster().startDataOnlyNode();
321+
final String nodeB = internalCluster().startDataOnlyNode();
322+
323+
createIndexWithDefaultSettings(1, 1);
324+
ensureGreen(indexName);
325+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
326+
327+
// validate next version docs get indexed
328+
produceDataWithExternalVersion("1", 2, "name1", "30", defaultMessageTimestamp, "index");
329+
produceDataWithExternalVersion("2", 2, "name2", "30", defaultMessageTimestamp, "index");
330+
waitForState(() -> {
331+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
332+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
333+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
334+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
335+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
336+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
337+
return 30 == (Integer) response1.getHits().getHits()[0].getSourceAsMap().get("age")
338+
&& 30 == (Integer) response2.getHits().getHits()[0].getSourceAsMap().get("age");
339+
});
340+
341+
// test out-of-order updates
342+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
343+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
344+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
345+
waitForSearchableDocs(3, Arrays.asList(nodeA, nodeB));
346+
347+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
348+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
349+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
350+
assertEquals(30, response1.getHits().getHits()[0].getSourceAsMap().get("age"));
351+
352+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
353+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
354+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
355+
assertEquals(30, response2.getHits().getHits()[0].getSourceAsMap().get("age"));
356+
357+
// test deletes with smaller version
358+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "delete");
359+
produceDataWithExternalVersion("4", 1, "name4", "25", defaultMessageTimestamp, "index");
360+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
361+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(23);
362+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
363+
assertThat(response.getHits().getTotalHits().value(), is(4L));
364+
365+
// test deletes with correct version
366+
produceDataWithExternalVersion("1", 3, "name1", "30", defaultMessageTimestamp, "delete");
367+
produceDataWithExternalVersion("2", 3, "name2", "30", defaultMessageTimestamp, "delete");
368+
waitForState(() -> {
369+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
370+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
371+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
372+
return true;
373+
});
374+
}
375+
376+
public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
377+
// setup nodes and index
378+
internalCluster().startClusterManagerOnlyNode();
379+
final String nodeA = internalCluster().startDataOnlyNode();
380+
final String nodeB = internalCluster().startDataOnlyNode();
381+
382+
// create index with index.gc_deletes = 1 ms
383+
createIndex(
384+
indexName,
385+
Settings.builder()
386+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
387+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
388+
.put("ingestion_source.type", "kafka")
389+
.put("ingestion_source.pointer.init.reset", "earliest")
390+
.put("ingestion_source.param.topic", topicName)
391+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
392+
.put("index.replication.type", "SEGMENT")
393+
.put("index.gc_deletes", "0")
394+
.build(),
395+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
396+
);
397+
398+
// insert documents
399+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
400+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
401+
waitForState(() -> {
402+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
403+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
404+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
405+
return true;
406+
});
407+
408+
// delete documents 1 and 2
409+
produceDataWithExternalVersion("1", 2, "name1", "25", defaultMessageTimestamp, "delete");
410+
produceDataWithExternalVersion("2", 2, "name2", "25", defaultMessageTimestamp, "delete");
411+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
412+
waitForState(() -> {
413+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 3));
414+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
415+
assertThat(response.getHits().getTotalHits().value(), is(1L));
416+
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
417+
});
418+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
419+
420+
// validate index operation with lower version creates new document
421+
produceDataWithExternalVersion("1", 1, "name1", "35", defaultMessageTimestamp, "index");
422+
produceDataWithExternalVersion("4", 1, "name4", "35", defaultMessageTimestamp, "index");
423+
waitForState(() -> {
424+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(34);
425+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
426+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
427+
return true;
428+
});
429+
430+
}
431+
313432
private void verifyRemoteStoreEnabled(String node) {
314433
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
315434
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 118 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,13 @@
1616
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
1717
import org.opensearch.cluster.metadata.IndexMetadata;
1818
import org.opensearch.cluster.metadata.IngestionSource;
19+
import org.opensearch.common.lease.Releasable;
1920
import org.opensearch.common.lucene.Lucene;
21+
import org.opensearch.common.lucene.uid.Versions;
2022
import org.opensearch.index.IngestionConsumerFactory;
2123
import org.opensearch.index.IngestionShardConsumer;
2224
import org.opensearch.index.IngestionShardPointer;
25+
import org.opensearch.index.VersionType;
2326
import org.opensearch.index.mapper.DocumentMapperForType;
2427
import org.opensearch.index.mapper.IdFieldMapper;
2528
import org.opensearch.index.mapper.ParseContext;
@@ -47,6 +50,7 @@
4750
import java.util.function.BiFunction;
4851

4952
import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
53+
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_LOCATION;
5054
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT;
5155

5256
/**
@@ -58,6 +62,8 @@ public class IngestionEngine extends InternalEngine {
5862
private final IngestionConsumerFactory ingestionConsumerFactory;
5963
private final DocumentMapperForType documentMapperForType;
6064

65+
// private final LiveVersionMap versionMap = new LiveVersionMap();
66+
6167
public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory ingestionConsumerFactory) {
6268
super(engineConfig);
6369
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
@@ -157,15 +163,42 @@ public IndexResult index(Index index) throws IOException {
157163
/**
158164
* Indexes the document into the engine. This is used internally by the stream poller only.
159165
* @param index the index request
160-
* @return the index result
161166
* @throws IOException if an error occurs
162167
*/
163-
public IndexResult indexInternal(Index index) throws IOException {
168+
public void indexInternal(Index index) throws IOException {
169+
// todo: add number of inserts/updates metric
164170
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
165171
ensureOpen();
166-
final IndexResult indexResult;
167-
indexResult = indexIntoLucene(index);
168-
return indexResult;
172+
173+
try (Releasable releasableLock = versionMap.acquireLock(index.uid().bytes())) {
174+
lastWriteNanos = index.startTime();
175+
boolean isExternalVersioning = index.versionType() == VersionType.EXTERNAL;
176+
if (index.getAutoGeneratedIdTimestamp() == UNSET_AUTO_GENERATED_TIMESTAMP) {
177+
validateDocumentVersion(index);
178+
}
179+
180+
if (isExternalVersioning) {
181+
index.parsedDoc().version().setLongValue(index.version());
182+
}
183+
184+
IndexResult indexResult = indexIntoLucene(index);
185+
if (isExternalVersioning && indexResult.getResultType() == Result.Type.SUCCESS) {
186+
versionMap.maybePutIndexUnderLock(
187+
index.uid().bytes(),
188+
new IndexVersionValue(EMPTY_TRANSLOG_LOCATION, index.version(), index.seqNo(), index.primaryTerm())
189+
);
190+
}
191+
} catch (VersionConflictEngineException e) {
192+
logger.debug("Version conflict encountered when processing index operation", e);
193+
throw e;
194+
} catch (RuntimeException | IOException e) {
195+
try {
196+
maybeFailEngine("index", e);
197+
} catch (Exception inner) {
198+
e.addSuppressed(inner);
199+
}
200+
throw e;
201+
}
169202
}
170203

171204
private IndexResult indexIntoLucene(Index index) throws IOException {
@@ -203,18 +236,53 @@ public DeleteResult delete(Delete delete) throws IOException {
203236
/**
204237
* Processes delete operations. This is used internally by the stream poller only.
205238
*/
206-
public DeleteResult deleteInternal(Delete delete) throws IOException {
239+
public void deleteInternal(Delete delete) throws IOException {
240+
// todo: add number of deletes metric
241+
versionMap.enforceSafeAccess();
207242
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
208243
ensureOpen();
209-
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
210-
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
211-
final ParseContext.Document doc = tombstone.docs().get(0);
212-
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
213-
+ doc
214-
+ " ]";
215-
doc.add(softDeletesField);
216-
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
217-
return new DeleteResult(1, delete.primaryTerm(), -1, true);
244+
lastWriteNanos = delete.startTime();
245+
246+
try (Releasable releasableLock = versionMap.acquireLock(delete.uid().bytes())) {
247+
validateDocumentVersion(delete);
248+
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
249+
boolean isExternalVersioning = delete.versionType() == VersionType.EXTERNAL;
250+
if (isExternalVersioning) {
251+
tombstone.version().setLongValue(delete.version());
252+
}
253+
254+
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
255+
final ParseContext.Document doc = tombstone.docs().get(0);
256+
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
257+
+ doc
258+
+ " ]";
259+
doc.add(softDeletesField);
260+
261+
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
262+
if (isExternalVersioning) {
263+
versionMap.putDeleteUnderLock(
264+
delete.uid().bytes(),
265+
new DeleteVersionValue(
266+
delete.version(),
267+
delete.seqNo(),
268+
delete.primaryTerm(),
269+
engineConfig.getThreadPool().relativeTimeInMillis()
270+
)
271+
);
272+
}
273+
} catch (VersionConflictEngineException e) {
274+
logger.debug("Version conflict encountered when processing deletes", e);
275+
throw e;
276+
} catch (RuntimeException | IOException e) {
277+
try {
278+
maybeFailEngine("delete", e);
279+
} catch (Exception inner) {
280+
e.addSuppressed(inner);
281+
}
282+
throw e;
283+
}
284+
285+
maybePruneDeletes();
218286
}
219287

220288
@Override
@@ -229,6 +297,14 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
229297
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
230298
}
231299

300+
@Override
301+
protected void pruneDeletedTombstones() {
302+
final long timeMSec = engineConfig.getThreadPool().relativeTimeInMillis();
303+
final long maxTimestampToPrune = timeMSec - engineConfig.getIndexSettings().getGcDeletesInMillis();
304+
versionMap.pruneTombstones(maxTimestampToPrune, Long.MAX_VALUE);
305+
lastDeleteVersionPruneTimeMSec = timeMSec;
306+
}
307+
232308
@Override
233309
public Translog.Snapshot newChangesSnapshot(
234310
String source,
@@ -381,6 +457,33 @@ private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy er
381457
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
382458
}
383459

460+
/**
461+
* Validates document version for pull-based ingestion. Only external versioning is supported.
462+
*/
463+
private void validateDocumentVersion(final Operation operation) throws IOException {
464+
if (operation.versionType() != VersionType.EXTERNAL) {
465+
return;
466+
}
467+
468+
versionMap.enforceSafeAccess();
469+
final VersionValue versionValue = resolveDocVersion(operation, false);
470+
final long currentVersion;
471+
final boolean currentNotFoundOrDeleted;
472+
473+
if (versionValue == null) {
474+
// todo: possible to optimize addDoc instead of updateDoc if version is not present?
475+
currentVersion = Versions.NOT_FOUND;
476+
currentNotFoundOrDeleted = true;
477+
} else {
478+
currentVersion = versionValue.version;
479+
currentNotFoundOrDeleted = versionValue.isDelete();
480+
}
481+
482+
if (operation.versionType().isVersionConflictForWrites(currentVersion, operation.version(), currentNotFoundOrDeleted)) {
483+
throw new VersionConflictEngineException(shardId, operation, currentVersion, currentNotFoundOrDeleted);
484+
}
485+
}
486+
384487
/**
385488
* Pause the poller. Used by management flows.
386489
*/

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public class InternalEngine extends Engine {
153153
/**
154154
* When we last pruned expired tombstones from versionMap.deletes:
155155
*/
156-
private volatile long lastDeleteVersionPruneTimeMSec;
156+
protected volatile long lastDeleteVersionPruneTimeMSec;
157157

158158
protected final TranslogManager translogManager;
159159
protected final IndexWriter indexWriter;
@@ -163,6 +163,10 @@ public class InternalEngine extends Engine {
163163
protected final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
164164
protected final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
165165

166+
// A uid (in the form of BytesRef) to the version map
167+
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
168+
protected final LiveVersionMap versionMap = new LiveVersionMap();
169+
166170
@Nullable
167171
protected final String historyUUID;
168172

@@ -173,10 +177,6 @@ public class InternalEngine extends Engine {
173177
private final Lock flushLock = new ReentrantLock();
174178
private final ReentrantLock optimizeLock = new ReentrantLock();
175179

176-
// A uid (in the form of BytesRef) to the version map
177-
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
178-
private final LiveVersionMap versionMap = new LiveVersionMap();
179-
180180
private volatile SegmentInfos lastCommittedSegmentInfos;
181181

182182
private final IndexThrottle throttle;
@@ -745,7 +745,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
745745
}
746746

747747
/** resolves the current version of the document, returning null if not found */
748-
private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) throws IOException {
748+
protected VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) throws IOException {
749749
assert incrementVersionLookup(); // used for asserting in tests
750750
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
751751
if (versionValue == null) {
@@ -1974,7 +1974,7 @@ private void refreshLastCommittedSegmentInfos() {
19741974
}
19751975
}
19761976

1977-
private void pruneDeletedTombstones() {
1977+
protected void pruneDeletedTombstones() {
19781978
/*
19791979
* We need to deploy two different trimming strategies for GC deletes on primary and replicas. Delete operations on primary
19801980
* are remembered for at least one GC delete cycle and trimmed periodically. This is, at the moment, the best we can do on

0 commit comments

Comments
 (0)