Skip to content

Commit 3312eda

Browse files
Fix unclosed store references with node-node segrep when primary node is unknown. (#16106) (#16435)
This PR fixes a bug with node-node pull based replication where if the replica does not know the DiscoveryNode of its primary we would fail after constructing a SegmentReplicationTarget that holds a store reference. Only after replication is started would a failure occur because the source node is null, and the target would not get cleaned up. Push based replication already handled this case by catching any error and closing the target. This update ensures the validation is done before constructing our PrimaryShardReplicationSource, before any target object is created in both cases push and pull. (cherry picked from commit 267c68e) Signed-off-by: Marc Handalian <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 06c1a5a commit 3312eda

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ public void testFailoverWithSearchReplica_WithoutWriterReplicas() throws IOExcep
126126
.put(indexSettings())
127127
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numWriterReplicas)
128128
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, numSearchReplicas)
129+
.put("index.refresh_interval", "40ms") // set lower interval so replica attempts replication cycles after primary is
130+
// removed.
129131
.build()
130132
);
131133
ensureYellow(TEST_INDEX);

server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public PrimaryShardReplicationSource(
4747
RecoverySettings recoverySettings,
4848
DiscoveryNode sourceNode
4949
) {
50+
assert targetNode != null : "Target node must be set";
51+
assert sourceNode != null : "Source node must be set";
5052
this.targetAllocationId = targetAllocationId;
5153
this.transportService = transportService;
5254
this.sourceNode = sourceNode;

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public SegmentReplicationSource get(IndexShard shard) {
5353

5454
private DiscoveryNode getPrimaryNode(ShardId shardId) {
5555
ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(shardId).primaryShard();
56-
return clusterService.state().nodes().get(primaryShard.currentNodeId());
56+
DiscoveryNode node = clusterService.state().nodes().get(primaryShard.currentNodeId());
57+
if (node == null) {
58+
throw new IllegalStateException("Cannot replicate, primary shard for " + shardId + " is not allocated on any node");
59+
}
60+
return node;
5761
}
5862
}

server/src/test/java/org/opensearch/indices/replication/SegmentReplicatorTests.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@
1010

1111
import org.apache.lucene.store.IOContext;
1212
import org.opensearch.OpenSearchCorruptionException;
13+
import org.opensearch.cluster.ClusterState;
1314
import org.opensearch.cluster.metadata.IndexMetadata;
15+
import org.opensearch.cluster.routing.IndexRoutingTable;
16+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
17+
import org.opensearch.cluster.routing.RoutingTable;
18+
import org.opensearch.cluster.routing.UnassignedInfo;
19+
import org.opensearch.cluster.service.ClusterService;
1420
import org.opensearch.common.lucene.Lucene;
1521
import org.opensearch.common.settings.Settings;
1622
import org.opensearch.core.action.ActionListener;
@@ -19,10 +25,12 @@
1925
import org.opensearch.index.shard.IndexShard;
2026
import org.opensearch.index.shard.IndexShardTestCase;
2127
import org.opensearch.index.store.StoreFileMetadata;
28+
import org.opensearch.indices.recovery.RecoverySettings;
2229
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
2330
import org.opensearch.indices.replication.common.CopyState;
2431
import org.opensearch.indices.replication.common.ReplicationType;
2532
import org.opensearch.threadpool.ThreadPool;
33+
import org.opensearch.transport.TransportService;
2634

2735
import java.io.IOException;
2836
import java.io.UncheckedIOException;
@@ -45,6 +53,48 @@ public class SegmentReplicatorTests extends IndexShardTestCase {
4553
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
4654
.build();
4755

56+
public void testReplicationWithUnassignedPrimary() throws Exception {
57+
final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory());
58+
final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory());
59+
SegmentReplicator replicator = new SegmentReplicator(threadPool);
60+
61+
ClusterService cs = mock(ClusterService.class);
62+
IndexShardRoutingTable.Builder shardRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
63+
shardRoutingTable.addShard(replica.routingEntry());
64+
shardRoutingTable.addShard(primary.routingEntry().moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test")));
65+
66+
when(cs.state()).thenReturn(buildClusterState(replica, shardRoutingTable));
67+
replicator.setSourceFactory(new SegmentReplicationSourceFactory(mock(TransportService.class), mock(RecoverySettings.class), cs));
68+
expectThrows(IllegalStateException.class, () -> replicator.startReplication(replica));
69+
closeShards(replica, primary);
70+
}
71+
72+
public void testReplicationWithUnknownPrimaryNode() throws Exception {
73+
final IndexShard replica = newStartedShard(false, settings, new NRTReplicationEngineFactory());
74+
final IndexShard primary = newStartedShard(true, settings, new NRTReplicationEngineFactory());
75+
SegmentReplicator replicator = new SegmentReplicator(threadPool);
76+
77+
ClusterService cs = mock(ClusterService.class);
78+
IndexShardRoutingTable.Builder shardRoutingTable = new IndexShardRoutingTable.Builder(replica.shardId());
79+
shardRoutingTable.addShard(replica.routingEntry());
80+
shardRoutingTable.addShard(primary.routingEntry());
81+
82+
when(cs.state()).thenReturn(buildClusterState(replica, shardRoutingTable));
83+
replicator.setSourceFactory(new SegmentReplicationSourceFactory(mock(TransportService.class), mock(RecoverySettings.class), cs));
84+
expectThrows(IllegalStateException.class, () -> replicator.startReplication(replica));
85+
closeShards(replica, primary);
86+
}
87+
88+
private ClusterState buildClusterState(IndexShard replica, IndexShardRoutingTable.Builder indexShard) {
89+
return ClusterState.builder(clusterService.state())
90+
.routingTable(
91+
RoutingTable.builder()
92+
.add(IndexRoutingTable.builder(replica.shardId().getIndex()).addIndexShard(indexShard.build()).build())
93+
.build()
94+
)
95+
.build();
96+
}
97+
4898
public void testStartReplicationWithoutSourceFactory() {
4999
ThreadPool threadpool = mock(ThreadPool.class);
50100
ExecutorService mock = mock(ExecutorService.class);

0 commit comments

Comments
 (0)