Skip to content

Commit 98bb859

Browse files
committed
Adding more IT for reindex flow and UTs in IntenalEngine for derived source
Signed-off-by: Tanik Pansuriya <[email protected]>
1 parent 052813b commit 98bb859

File tree

2 files changed

+349
-3
lines changed

2 files changed

+349
-3
lines changed

modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexBasicTests.java

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,14 @@
3232

3333
package org.opensearch.index.reindex;
3434

35+
import org.opensearch.action.bulk.BulkRequestBuilder;
36+
import org.opensearch.action.bulk.BulkResponse;
3537
import org.opensearch.action.index.IndexRequestBuilder;
38+
import org.opensearch.action.search.SearchResponse;
39+
import org.opensearch.common.settings.Settings;
3640
import org.opensearch.common.xcontent.XContentType;
41+
import org.opensearch.search.SearchHit;
42+
import org.opensearch.search.sort.SortOrder;
3743

3844
import java.util.ArrayList;
3945
import java.util.Collection;
@@ -287,4 +293,194 @@ public void testReindexWithDerivedSource() throws Exception {
287293
expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value();
288294
assertEquals(numDocs + numDocs2, expectedCount);
289295
}
296+
297+
public void testReindexFromDerivedSourceToNormalIndex() throws Exception {
298+
// Create source index with derived source enabled
299+
String sourceMapping = """
300+
{
301+
"properties": {
302+
"text_field": {
303+
"type": "text",
304+
"store": true
305+
},
306+
"keyword_field": {
307+
"type": "keyword"
308+
},
309+
"numeric_field": {
310+
"type": "long",
311+
"doc_values": true
312+
},
313+
"date_field": {
314+
"type": "date",
315+
"store": true
316+
}
317+
}
318+
}""";
319+
320+
// Create destination index with normal settings
321+
String destMapping = """
322+
{
323+
"properties": {
324+
"text_field": {
325+
"type": "text"
326+
},
327+
"keyword_field": {
328+
"type": "keyword"
329+
},
330+
"numeric_field": {
331+
"type": "long"
332+
},
333+
"date_field": {
334+
"type": "date"
335+
}
336+
}
337+
}""";
338+
339+
// Create source index
340+
assertAcked(
341+
prepareCreate("source_index").setSettings(
342+
Settings.builder().put("index.number_of_shards", 2).put("index.derived_source.enabled", true)
343+
).setMapping(sourceMapping)
344+
);
345+
346+
// Create destination index
347+
assertAcked(prepareCreate("dest_index").setMapping(destMapping));
348+
349+
// Index test documents
350+
int numDocs = randomIntBetween(100, 200);
351+
final List<IndexRequestBuilder> docs = new ArrayList<>();
352+
for (int i = 0; i < numDocs; i++) {
353+
docs.add(
354+
client().prepareIndex("source_index")
355+
.setId(Integer.toString(i))
356+
.setSource(
357+
"text_field",
358+
"text value " + i,
359+
"keyword_field",
360+
"key_" + i,
361+
"numeric_field",
362+
i,
363+
"date_field",
364+
System.currentTimeMillis()
365+
)
366+
);
367+
}
368+
indexRandom(true, docs);
369+
refresh("source_index");
370+
371+
// Test 1: Basic reindex without slices
372+
ReindexRequestBuilder reindex = reindex().source("source_index").destination("dest_index").refresh(true);
373+
BulkByScrollResponse response = reindex.get();
374+
assertThat(response, matcher().created(numDocs));
375+
verifyReindexedContent("dest_index", numDocs);
376+
377+
// Test 2: Reindex with query filter
378+
String destFilteredIndex = "dest_filtered_index";
379+
assertAcked(prepareCreate(destFilteredIndex).setMapping(destMapping));
380+
reindex = reindex().source("source_index").destination(destFilteredIndex).filter(termQuery("keyword_field", "key_1")).refresh(true);
381+
response = reindex.get();
382+
assertThat(response, matcher().created(1));
383+
verifyReindexedContent(destFilteredIndex, 1);
384+
385+
// Test 3: Reindex with slices
386+
String destSlicedIndex = "dest_sliced_index";
387+
assertAcked(prepareCreate(destSlicedIndex).setMapping(destMapping));
388+
int slices = randomSlices();
389+
int expectedSlices = expectedSliceStatuses(slices, "source_index");
390+
391+
reindex = reindex().source("source_index").destination(destSlicedIndex).setSlices(slices).refresh(true);
392+
response = reindex.get();
393+
assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices)));
394+
verifyReindexedContent(destSlicedIndex, numDocs);
395+
396+
// Test 4: Reindex with field transformation
397+
String destTransformedIndex = "dest_transformed_index";
398+
String transformedMapping = """
399+
{
400+
"properties": {
401+
"new_text_field": {
402+
"type": "text"
403+
},
404+
"new_keyword_field": {
405+
"type": "keyword"
406+
},
407+
"modified_numeric": {
408+
"type": "long"
409+
},
410+
"date_field": {
411+
"type": "date"
412+
}
413+
}
414+
}""";
415+
assertAcked(prepareCreate(destTransformedIndex).setMapping(transformedMapping));
416+
417+
// First reindex the documents
418+
reindex = reindex().source("source_index").destination(destTransformedIndex).refresh(true);
419+
response = reindex.get();
420+
assertThat(response, matcher().created(numDocs));
421+
422+
// Then transform using bulk update
423+
BulkRequestBuilder bulkRequest = client().prepareBulk();
424+
SearchResponse searchResponse = client().prepareSearch(destTransformedIndex).setQuery(matchAllQuery()).setSize(numDocs).get();
425+
426+
for (SearchHit hit : searchResponse.getHits()) {
427+
Map<String, Object> source = hit.getSourceAsMap();
428+
Map<String, Object> newSource = new HashMap<>();
429+
430+
// Transform fields
431+
newSource.put("new_text_field", source.get("text_field"));
432+
newSource.put("new_keyword_field", source.get("keyword_field"));
433+
newSource.put("modified_numeric", ((Number) source.get("numeric_field")).longValue() + 1000);
434+
newSource.put("date_field", source.get("date_field"));
435+
436+
bulkRequest.add(client().prepareIndex(destTransformedIndex).setId(hit.getId()).setSource(newSource));
437+
}
438+
439+
BulkResponse bulkResponse = bulkRequest.get();
440+
assertFalse(bulkResponse.hasFailures());
441+
refresh(destTransformedIndex);
442+
verifyTransformedContent(destTransformedIndex, numDocs);
443+
}
444+
445+
private void verifyReindexedContent(String indexName, int expectedCount) {
446+
refresh(indexName);
447+
SearchResponse searchResponse = client().prepareSearch(indexName)
448+
.setQuery(matchAllQuery())
449+
.setSize(expectedCount)
450+
.addSort("numeric_field", SortOrder.ASC)
451+
.get();
452+
453+
assertHitCount(searchResponse, expectedCount);
454+
455+
for (SearchHit hit : searchResponse.getHits()) {
456+
Map<String, Object> source = hit.getSourceAsMap();
457+
int id = Integer.parseInt(hit.getId());
458+
459+
assertEquals("text value " + id, source.get("text_field"));
460+
assertEquals("key_" + id, source.get("keyword_field"));
461+
assertEquals(id, ((Number) source.get("numeric_field")).intValue());
462+
assertNotNull(source.get("date_field"));
463+
}
464+
}
465+
466+
private void verifyTransformedContent(String indexName, int expectedCount) {
467+
refresh(indexName);
468+
SearchResponse searchResponse = client().prepareSearch(indexName)
469+
.setQuery(matchAllQuery())
470+
.setSize(expectedCount)
471+
.addSort("modified_numeric", SortOrder.ASC)
472+
.get();
473+
474+
assertHitCount(searchResponse, expectedCount);
475+
476+
for (SearchHit hit : searchResponse.getHits()) {
477+
Map<String, Object> source = hit.getSourceAsMap();
478+
int id = Integer.parseInt(hit.getId());
479+
480+
assertEquals("text value " + id, source.get("new_text_field"));
481+
assertEquals("key_" + id, source.get("new_keyword_field"));
482+
assertEquals(id + 1000, ((Number) source.get("modified_numeric")).longValue());
483+
assertNotNull(source.get("date_field"));
484+
}
485+
}
290486
}

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 153 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import org.opensearch.common.util.concurrent.ConcurrentCollections;
116116
import org.opensearch.common.util.concurrent.ReleasableLock;
117117
import org.opensearch.common.util.io.IOUtils;
118+
import org.opensearch.common.util.set.Sets;
118119
import org.opensearch.core.action.ActionListener;
119120
import org.opensearch.core.common.Strings;
120121
import org.opensearch.core.common.bytes.BytesArray;
@@ -8286,9 +8287,6 @@ public void testNewChangesSnapshotWithDerivedSource() throws IOException {
82868287
// Verify source is derived correctly
82878288
Translog.Index indexOp = (Translog.Index) operation;
82888289
assertNotNull("Source should not be null", indexOp.source());
8289-
8290-
// Additional derived source specific checks can be added here
8291-
82928290
count++;
82938291
}
82948292

@@ -8299,6 +8297,158 @@ public void testNewChangesSnapshotWithDerivedSource() throws IOException {
82998297
}
83008298
}
83018299

8300+
public void testNewChangesSnapshotWithDeleteAndUpdate() throws IOException {
8301+
final List<Engine.Operation> operations = new ArrayList<>();
8302+
int numDocs = randomIntBetween(10, 100);
8303+
int numDocsToDelete = randomIntBetween(1, numDocs / 2);
8304+
Set<String> deletedDocs = new HashSet<>();
8305+
8306+
// First index documents
8307+
for (int i = 0; i < numDocs; i++) {
8308+
ParsedDocument doc = testParsedDocument(
8309+
Integer.toString(i),
8310+
null,
8311+
testDocumentWithTextField(),
8312+
new BytesArray("{\"value\":\"test\"}".getBytes(Charset.defaultCharset())),
8313+
null
8314+
);
8315+
Engine.Index index = new Engine.Index(
8316+
newUid(doc),
8317+
doc,
8318+
UNASSIGNED_SEQ_NO,
8319+
primaryTerm.get(),
8320+
i,
8321+
VersionType.EXTERNAL,
8322+
Engine.Operation.Origin.PRIMARY,
8323+
System.nanoTime(),
8324+
-1,
8325+
false,
8326+
UNASSIGNED_SEQ_NO,
8327+
0
8328+
);
8329+
operations.add(index);
8330+
engine.index(index);
8331+
}
8332+
8333+
// Delete some documents
8334+
for (int i = 0; i < numDocsToDelete; i++) {
8335+
String idToDelete = Integer.toString(randomInt(numDocs - 1));
8336+
if (!deletedDocs.contains(idToDelete)) {
8337+
final Engine.Delete delete = new Engine.Delete(
8338+
idToDelete,
8339+
newUid(idToDelete),
8340+
UNASSIGNED_SEQ_NO,
8341+
primaryTerm.get(),
8342+
i + numDocs,
8343+
VersionType.EXTERNAL,
8344+
Engine.Operation.Origin.PRIMARY,
8345+
System.nanoTime(),
8346+
UNASSIGNED_SEQ_NO,
8347+
0
8348+
);
8349+
operations.add(delete);
8350+
engine.delete(delete);
8351+
deletedDocs.add(idToDelete);
8352+
}
8353+
}
8354+
8355+
// Update some remaining documents
8356+
int numDocsToUpdate = randomIntBetween(1, numDocs - deletedDocs.size());
8357+
Set<String> updatedDocs = new HashSet<>();
8358+
for (int i = 0; i < numDocsToUpdate; i++) {
8359+
String idToUpdate;
8360+
do {
8361+
idToUpdate = Integer.toString(randomInt(numDocs - 1));
8362+
} while (deletedDocs.contains(idToUpdate) || updatedDocs.contains(idToUpdate));
8363+
8364+
Document document = testDocument();
8365+
document.add(new TextField("value", "updated", Field.Store.YES));
8366+
ParsedDocument doc = testParsedDocument(
8367+
idToUpdate,
8368+
null,
8369+
document,
8370+
new BytesArray("{\"value\":\"updated\"}".getBytes(Charset.defaultCharset())),
8371+
null
8372+
);
8373+
Engine.Index update = new Engine.Index(
8374+
newUid(doc),
8375+
doc,
8376+
UNASSIGNED_SEQ_NO,
8377+
primaryTerm.get(),
8378+
numDocs + numDocsToDelete + i,
8379+
VersionType.EXTERNAL,
8380+
Engine.Operation.Origin.PRIMARY,
8381+
System.nanoTime(),
8382+
-1,
8383+
false,
8384+
UNASSIGNED_SEQ_NO,
8385+
0
8386+
);
8387+
operations.add(update);
8388+
engine.index(update);
8389+
updatedDocs.add(idToUpdate);
8390+
}
8391+
8392+
engine.refresh("test");
8393+
final BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory = engine::acquireSearcher;
8394+
8395+
// Test snapshot with all operations
8396+
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, operations.size() - 1, true, true)) {
8397+
int count = 0;
8398+
Translog.Operation operation;
8399+
while ((operation = snapshot.next()) != null) {
8400+
if (operation instanceof Translog.Index) {
8401+
Translog.Index indexOp = (Translog.Index) operation;
8402+
String docId = indexOp.id();
8403+
if (updatedDocs.contains(docId)) {
8404+
// Verify updated content using get
8405+
try (
8406+
Engine.GetResult get = engine.get(new Engine.Get(true, false, docId, newUid(docId)), engine::acquireSearcher)
8407+
) {
8408+
assertTrue("Document " + docId + " should exist", get.exists());
8409+
org.apache.lucene.document.Document doc = get.docIdAndVersion().reader.storedFields()
8410+
.document(get.docIdAndVersion().docId, Sets.newHashSet(SourceFieldMapper.NAME));
8411+
final BytesRef source = doc.getBinaryValue(SourceFieldMapper.NAME);
8412+
assertEquals(
8413+
"Document " + docId + " should have updated value",
8414+
"{\"value\":\"updated\"}",
8415+
source.utf8ToString()
8416+
);
8417+
}
8418+
}
8419+
} else if (operation instanceof Translog.Delete) {
8420+
String docId = ((Translog.Delete) operation).id();
8421+
assertTrue("Document " + docId + " should be in deleted set", deletedDocs.contains(docId));
8422+
8423+
// Verify document is actually deleted
8424+
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, docId, newUid(docId)), engine::acquireSearcher)) {
8425+
assertFalse("Document " + docId + " should not exist", get.exists());
8426+
}
8427+
}
8428+
count++;
8429+
}
8430+
8431+
// Verify we got all operations
8432+
assertEquals("Expected number of operations", operations.size(), count);
8433+
}
8434+
8435+
// Test snapshot with accurate count
8436+
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, operations.size() - 1, true, true)) {
8437+
assertEquals("Total number of operations", operations.size(), snapshot.totalOperations());
8438+
}
8439+
8440+
// Test snapshot with specific range
8441+
int from = randomIntBetween(0, operations.size() / 2);
8442+
int to = randomIntBetween(from, operations.size() - 1);
8443+
try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", from, to, false, true)) {
8444+
int count = 0;
8445+
while (snapshot.next() != null) {
8446+
count++;
8447+
}
8448+
assertEquals("Expected number of operations in range", to - from + 1, count);
8449+
}
8450+
}
8451+
83028452
private EngineConfig createEngineConfigWithMapperSupplier(Settings settings, Store store) throws IOException {
83038453
IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
83048454
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

0 commit comments

Comments
 (0)