10
10
11
11
import org .apache .lucene .store .IOContext ;
12
12
import org .opensearch .OpenSearchCorruptionException ;
13
+ import org .opensearch .cluster .ClusterState ;
13
14
import org .opensearch .cluster .metadata .IndexMetadata ;
15
+ import org .opensearch .cluster .routing .IndexRoutingTable ;
16
+ import org .opensearch .cluster .routing .IndexShardRoutingTable ;
17
+ import org .opensearch .cluster .routing .RoutingTable ;
18
+ import org .opensearch .cluster .routing .UnassignedInfo ;
19
+ import org .opensearch .cluster .service .ClusterService ;
14
20
import org .opensearch .common .lucene .Lucene ;
15
21
import org .opensearch .common .settings .Settings ;
16
22
import org .opensearch .core .action .ActionListener ;
19
25
import org .opensearch .index .shard .IndexShard ;
20
26
import org .opensearch .index .shard .IndexShardTestCase ;
21
27
import org .opensearch .index .store .StoreFileMetadata ;
28
+ import org .opensearch .indices .recovery .RecoverySettings ;
22
29
import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
23
30
import org .opensearch .indices .replication .common .CopyState ;
24
31
import org .opensearch .indices .replication .common .ReplicationType ;
25
32
import org .opensearch .threadpool .ThreadPool ;
33
+ import org .opensearch .transport .TransportService ;
26
34
27
35
import java .io .IOException ;
28
36
import java .io .UncheckedIOException ;
@@ -45,6 +53,48 @@ public class SegmentReplicatorTests extends IndexShardTestCase {
45
53
.put (IndexMetadata .SETTING_REPLICATION_TYPE , ReplicationType .SEGMENT )
46
54
.build ();
47
55
56
+ public void testReplicationWithUnassignedPrimary () throws Exception {
57
+ final IndexShard replica = newStartedShard (false , settings , new NRTReplicationEngineFactory ());
58
+ final IndexShard primary = newStartedShard (true , settings , new NRTReplicationEngineFactory ());
59
+ SegmentReplicator replicator = new SegmentReplicator (threadPool );
60
+
61
+ ClusterService cs = mock (ClusterService .class );
62
+ IndexShardRoutingTable .Builder shardRoutingTable = new IndexShardRoutingTable .Builder (replica .shardId ());
63
+ shardRoutingTable .addShard (replica .routingEntry ());
64
+ shardRoutingTable .addShard (primary .routingEntry ().moveToUnassigned (new UnassignedInfo (UnassignedInfo .Reason .NODE_LEFT , "test" )));
65
+
66
+ when (cs .state ()).thenReturn (buildClusterState (replica , shardRoutingTable ));
67
+ replicator .setSourceFactory (new SegmentReplicationSourceFactory (mock (TransportService .class ), mock (RecoverySettings .class ), cs ));
68
+ expectThrows (IllegalStateException .class , () -> replicator .startReplication (replica ));
69
+ closeShards (replica , primary );
70
+ }
71
+
72
+ public void testReplicationWithUnknownPrimaryNode () throws Exception {
73
+ final IndexShard replica = newStartedShard (false , settings , new NRTReplicationEngineFactory ());
74
+ final IndexShard primary = newStartedShard (true , settings , new NRTReplicationEngineFactory ());
75
+ SegmentReplicator replicator = new SegmentReplicator (threadPool );
76
+
77
+ ClusterService cs = mock (ClusterService .class );
78
+ IndexShardRoutingTable .Builder shardRoutingTable = new IndexShardRoutingTable .Builder (replica .shardId ());
79
+ shardRoutingTable .addShard (replica .routingEntry ());
80
+ shardRoutingTable .addShard (primary .routingEntry ());
81
+
82
+ when (cs .state ()).thenReturn (buildClusterState (replica , shardRoutingTable ));
83
+ replicator .setSourceFactory (new SegmentReplicationSourceFactory (mock (TransportService .class ), mock (RecoverySettings .class ), cs ));
84
+ expectThrows (IllegalStateException .class , () -> replicator .startReplication (replica ));
85
+ closeShards (replica , primary );
86
+ }
87
+
88
+ private ClusterState buildClusterState (IndexShard replica , IndexShardRoutingTable .Builder indexShard ) {
89
+ return ClusterState .builder (clusterService .state ())
90
+ .routingTable (
91
+ RoutingTable .builder ()
92
+ .add (IndexRoutingTable .builder (replica .shardId ().getIndex ()).addIndexShard (indexShard .build ()).build ())
93
+ .build ()
94
+ )
95
+ .build ();
96
+ }
97
+
48
98
public void testStartReplicationWithoutSourceFactory () {
49
99
ThreadPool threadpool = mock (ThreadPool .class );
50
100
ExecutorService mock = mock (ExecutorService .class );
0 commit comments