Skip to content

Commit 77f80d9

Browse files
committed
Covering more recovery cases for derived source
Signed-off-by: Tanik Pansuriya <[email protected]>
1 parent 98bb859 commit 77f80d9

File tree

5 files changed

+362
-231
lines changed

5 files changed

+362
-231
lines changed

server/src/internalClusterTest/java/org/opensearch/recovery/FullRollingRestartIT.java

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -573,21 +573,6 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
573573

574574
// Initial indexing
575575
int docCount = randomIntBetween(100, 200); // Reduced count for stability
576-
bulkIndexDocumentsWithVersion(docCount);
577-
578-
refresh("test");
579-
flush("test");
580-
ensureGreen();
581-
582-
// Verify initial document count
583-
assertHitCount(client().prepareSearch("test").setSize(0).get(), docCount);
584-
585-
// Start concurrent updates during rolling restart
586-
logger.info("--> starting rolling restart with concurrent updates");
587-
concurrentUpdatesRollingRestartWithVerification(docCount);
588-
}
589-
590-
private void bulkIndexDocumentsWithVersion(int docCount) {
591576
BulkRequestBuilder bulkRequest = client().prepareBulk();
592577
for (int i = 0; i < docCount; i++) {
593578
bulkRequest.add(
@@ -606,6 +591,17 @@ private void bulkIndexDocumentsWithVersion(int docCount) {
606591
BulkResponse response = bulkRequest.execute().actionGet();
607592
assertFalse(response.hasFailures());
608593
}
594+
595+
refresh("test");
596+
flush("test");
597+
ensureGreen();
598+
599+
// Verify initial document count
600+
assertHitCount(client().prepareSearch("test").setSize(0).get(), docCount);
601+
602+
// Start concurrent updates during rolling restart
603+
logger.info("--> starting rolling restart with concurrent updates");
604+
concurrentUpdatesRollingRestartWithVerification(docCount);
609605
}
610606

611607
private void concurrentUpdatesRollingRestartWithVerification(int initialDocCount) throws Exception {
@@ -622,14 +618,7 @@ private void concurrentUpdatesRollingRestartWithVerification(int initialDocCount
622618
int docId = randomIntBetween(0, initialDocCount - 1);
623619
client().prepareUpdate("test", String.valueOf(docId))
624620
.setRetryOnConflict(3)
625-
.setDoc(
626-
"counter",
627-
randomIntBetween(0, 1000),
628-
"last_updated",
629-
System.currentTimeMillis(),
630-
"version",
631-
System.currentTimeMillis()
632-
)
621+
.setDoc("counter", randomIntBetween(0, 1000), "last_updated", System.currentTimeMillis(), "version", 1)
633622
.execute()
634623
.actionGet();
635624
totalUpdates.incrementAndGet();
@@ -686,7 +675,7 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
686675
assertBusy(() -> {
687676
SearchResponse response = client().prepareSearch("test")
688677
.setSize(expectedDocs)
689-
.addSort("version", SortOrder.DESC) // Sort by version to ensure we see latest updates
678+
.addSort("last_updated", SortOrder.DESC) // Sort by version to ensure we see latest updates
690679
.get();
691680
assertHitCount(response, expectedDocs);
692681

@@ -695,10 +684,10 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
695684
String id = hit.getId();
696685

697686
// Verify all required fields are present
698-
assertNotNull("text_field missing for doc " + id, source.get("text_field"));
687+
assertEquals("text value " + id, source.get("text_field"));
699688
assertNotNull("counter missing for doc " + id, source.get("counter"));
700-
assertNotNull("last_updated missing for doc " + id, source.get("last_updated"));
701-
assertNotNull("version missing for doc " + id, source.get("version"));
689+
assertFalse(((String) source.get("last_updated")).isEmpty());
690+
assertEquals(1, source.get("version"));
702691

703692
// Verify text_field maintains original value
704693
assertEquals("text value " + id, source.get("text_field"));

server/src/internalClusterTest/java/org/opensearch/recovery/RecoveryWhileUnderLoadIT.java

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ public void testRecoveryWithDerivedSourceEnabled() throws Exception {
522522
assertAcked(
523523
prepareCreate(
524524
"test",
525-
2,
525+
1,
526526
Settings.builder()
527527
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
528528
.put(SETTING_NUMBER_OF_REPLICAS, 1)
@@ -541,7 +541,7 @@ public void testRecoveryWithDerivedSourceEnabled() throws Exception {
541541
}
542542
}
543543

544-
logger.info("--> allow 2 nodes for index [test] ...");
544+
logger.info("--> allow 2 nodes for index [test] with replica ...");
545545
allowNodes("test", 2);
546546

547547
logger.info("--> waiting for GREEN health status ...");
@@ -585,6 +585,7 @@ public void testReplicaRecoveryWithDerivedSourceBeforeRefresh() throws Exception
585585
.put(SETTING_NUMBER_OF_REPLICAS, 0)
586586
.put(IndexSettings.INDEX_DERIVED_SOURCE_SETTING.getKey(), true)
587587
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC)
588+
.put("index.refresh_interval", -1)
588589
).setMapping(mapping)
589590
);
590591

@@ -668,7 +669,9 @@ public void testReplicaRecoveryWithDerivedSourceFromTranslog() throws Exception
668669

669670
// Kill replica node and index more documents
670671
final String replicaNode = ensureReplicaNode("test");
671-
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
672+
if (replicaNode != null) {
673+
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
674+
}
672675

673676
int additionalDocs = randomIntBetween(50, 100);
674677
for (int i = docCount; i < docCount + additionalDocs; i++) {
@@ -722,7 +725,7 @@ public void testRecoverWhileUnderLoadWithDerivedSource() throws Exception {
722725
assertAcked(
723726
prepareCreate(
724727
"test",
725-
2,
728+
1,
726729
Settings.builder()
727730
.put(SETTING_NUMBER_OF_SHARDS, numberOfShards)
728731
.put(SETTING_NUMBER_OF_REPLICAS, 1)
@@ -781,18 +784,37 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
781784
logger.info("--> verifying indexed content");
782785

783786
// Verify docs on primary
784-
assertPrimaryDocCount(totalNumDocs);
787+
SearchResponse primaryResponse = client().prepareSearch("test").setPreference("_primary").setTrackTotalHits(true).get();
788+
assertHitCount(primaryResponse, totalNumDocs);
785789

786790
// Verify docs and derived source on replica
787-
assertReplicaDocsAndSource(totalNumDocs);
791+
assertBusy(() -> {
792+
SearchResponse replicaResponse = client().prepareSearch("test")
793+
.setPreference("_replica")
794+
.setTrackTotalHits(true)
795+
.setSize(totalNumDocs)
796+
.addSort("value", SortOrder.ASC)
797+
.get();
798+
799+
assertHitCount(replicaResponse, totalNumDocs);
800+
801+
// Verify source reconstruction on replica
802+
for (SearchHit hit : replicaResponse.getHits()) {
803+
assertNotNull(hit.getSourceAsMap());
804+
assertEquals(3, hit.getSourceAsMap().size());
805+
int id = (Integer) hit.getSourceAsMap().get("value");
806+
assertEquals("name_" + id, hit.getSourceAsMap().get("name"));
807+
assertNotNull(hit.getSourceAsMap().get("timestamp"));
808+
}
809+
}, 30, TimeUnit.SECONDS);
788810

789811
// Additional source verification with random sampling
790812
assertRandomDocsSource(50);
791813
}
792814
}
793815

794816
public void testRecoverWithRelocationAndDerivedSource() throws Exception {
795-
final int numShards = between(2, 5);
817+
final int numShards = between(3, 5);
796818
logger.info("--> creating test index with derived source enabled...");
797819

798820
String mapping = """
@@ -813,7 +835,7 @@ public void testRecoverWithRelocationAndDerivedSource() throws Exception {
813835
assertAcked(
814836
prepareCreate(
815837
"test",
816-
3,
838+
1,
817839
Settings.builder()
818840
.put(SETTING_NUMBER_OF_SHARDS, numShards)
819841
.put(SETTING_NUMBER_OF_REPLICAS, 0)
@@ -822,9 +844,8 @@ public void testRecoverWithRelocationAndDerivedSource() throws Exception {
822844
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")
823845
).setMapping(mapping)
824846
);
825-
826-
final int numDocs = scaledRandomIntBetween(2000, 10000);
827-
int allowNodes = 2;
847+
int numNodes = 1;
848+
final int numDocs = scaledRandomIntBetween(1500, 2000);
828849

829850
try (BackgroundIndexer indexer = new BackgroundIndexer("test", null, client(), numDocs) {
830851
@Override
@@ -836,14 +857,13 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
836857
.endObject();
837858
}
838859
}) {
839-
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(100, Math.min(1000, numDocs))) {
860+
861+
for (int i = 0; i < numDocs; i += scaledRandomIntBetween(500, Math.min(1000, numDocs))) {
840862
indexer.assertNoFailures();
841863
logger.info("--> waiting for {} docs to be indexed ...", i);
842864
waitForDocs(i, indexer);
843-
844-
// Alternate between 1 and 2 nodes to force relocation
845-
allowNodes = 3 - allowNodes; // Toggle between 1 and 2
846-
allowNodes("test", allowNodes);
865+
internalCluster().startDataOnlyNode();
866+
numNodes++;
847867

848868
logger.info("--> waiting for GREEN health status ...");
849869
ensureGreen(TimeValue.timeValueMinutes(2));
@@ -854,12 +874,11 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
854874

855875
// Add replicas after stopping indexing
856876
logger.info("--> adding replicas ...");
857-
allowNodes("test", 3);
858877
assertAcked(
859878
client().admin()
860879
.indices()
861880
.prepareUpdateSettings("test")
862-
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 1))
881+
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, numNodes - 1))
863882
.get()
864883
);
865884
ensureGreen(TimeValue.timeValueMinutes(2));
@@ -868,37 +887,31 @@ protected XContentBuilder generateSource(long id, Random random) throws IOExcept
868887
client().admin().indices().prepareRefresh().get();
869888

870889
// Verify final doc count and derived source reconstruction
871-
assertPrimaryDocCount(numDocs);
872-
assertReplicaDocsAndSource(numDocs);
873-
assertRandomDocsSource(100);
874-
}
875-
}
876-
877-
private void assertPrimaryDocCount(int expectedCount) {
878-
SearchResponse primaryResponse = client().prepareSearch("test").setPreference("_primary").setTrackTotalHits(true).get();
879-
assertHitCount(primaryResponse, expectedCount);
880-
}
890+
SearchResponse primaryResponse = client().prepareSearch("test").setPreference("_primary").setTrackTotalHits(true).get();
891+
assertHitCount(primaryResponse, numDocs);
881892

882-
private void assertReplicaDocsAndSource(int expectedCount) throws Exception {
883-
assertBusy(() -> {
884-
SearchResponse replicaResponse = client().prepareSearch("test")
885-
.setPreference("_replica")
886-
.setTrackTotalHits(true)
887-
.setSize(expectedCount)
888-
.addSort("value", SortOrder.ASC)
889-
.get();
890-
891-
assertHitCount(replicaResponse, expectedCount);
893+
assertBusy(() -> {
894+
SearchResponse replicaResponse = client().prepareSearch("test")
895+
.setPreference("_replica")
896+
.setTrackTotalHits(true)
897+
.setSize(numDocs)
898+
.addSort("value", SortOrder.ASC)
899+
.get();
900+
901+
assertHitCount(replicaResponse, numDocs);
902+
903+
// Verify source reconstruction on replica
904+
for (SearchHit hit : replicaResponse.getHits()) {
905+
assertNotNull(hit.getSourceAsMap());
906+
assertEquals(3, hit.getSourceAsMap().size());
907+
int id = (Integer) hit.getSourceAsMap().get("value");
908+
assertEquals("name_" + id, hit.getSourceAsMap().get("name"));
909+
assertNotNull(hit.getSourceAsMap().get("timestamp"));
910+
}
911+
}, 30, TimeUnit.SECONDS);
892912

893-
// Verify source reconstruction on replica
894-
for (SearchHit hit : replicaResponse.getHits()) {
895-
assertNotNull(hit.getSourceAsMap());
896-
assertEquals(3, hit.getSourceAsMap().size());
897-
int id = (Integer) hit.getSourceAsMap().get("value");
898-
assertEquals("name_" + id, hit.getSourceAsMap().get("name"));
899-
assertNotNull(hit.getSourceAsMap().get("timestamp"));
900-
}
901-
}, 30, TimeUnit.SECONDS);
913+
assertRandomDocsSource(100);
914+
}
902915
}
903916

904917
private void assertRandomDocsSource(int sampleSize) {
@@ -921,7 +934,11 @@ private void assertRandomDocsSource(int sampleSize) {
921934
private String ensureReplicaNode(String index) {
922935
ClusterState state = client().admin().cluster().prepareState().get().getState();
923936
Index idx = state.metadata().index(index).getIndex();
924-
String nodeId = state.routingTable().index(idx).shard(0).replicaShards().get(0).currentNodeId();
925-
return state.nodes().get(nodeId).getName();
937+
String replicaNode = state.routingTable().index(idx).shard(0).replicaShards().get(0).currentNodeId();
938+
String clusterManagerNode = internalCluster().getClusterManagerName();
939+
if (!replicaNode.equals(clusterManagerNode)) {
940+
state.nodes().get(replicaNode).getName();
941+
}
942+
return null;
926943
}
927944
}

0 commit comments

Comments
 (0)