9
9
package org .opensearch .index .shard ;
10
10
11
11
import org .apache .logging .log4j .Logger ;
12
- import org .apache .logging .log4j .message .ParameterizedMessage ;
13
12
import org .apache .lucene .codecs .CodecUtil ;
14
- import org .apache .lucene .index .CorruptIndexException ;
15
13
import org .apache .lucene .index .SegmentInfos ;
16
14
import org .apache .lucene .store .Directory ;
17
15
import org .apache .lucene .store .FilterDirectory ;
18
16
import org .apache .lucene .store .IOContext ;
19
17
import org .apache .lucene .store .IndexInput ;
20
- import org .opensearch .action .LatchedActionListener ;
21
18
import org .opensearch .action .bulk .BackoffPolicy ;
22
- import org .opensearch .action .support .GroupedActionListener ;
23
19
import org .opensearch .cluster .routing .RecoverySource ;
24
20
import org .opensearch .common .concurrent .GatedCloseable ;
25
21
import org .opensearch .common .logging .Loggers ;
26
22
import org .opensearch .common .unit .TimeValue ;
27
- import org .opensearch .common .util .UploadListener ;
28
- import org .opensearch .core .action .ActionListener ;
29
- import org .opensearch .index .engine .EngineException ;
30
- import org .opensearch .index .engine .InternalEngine ;
31
23
import org .opensearch .index .remote .RemoteSegmentTransferTracker ;
32
- import org .opensearch .index .seqno .SequenceNumbers ;
33
- import org .opensearch .index .store .CompositeDirectory ;
34
24
import org .opensearch .index .store .RemoteSegmentStoreDirectory ;
35
25
import org .opensearch .index .store .remote .metadata .RemoteSegmentMetadata ;
36
- import org .opensearch .index .translog .Translog ;
37
26
import org .opensearch .indices .RemoteStoreSettings ;
38
- import org .opensearch .indices .replication .checkpoint .ReplicationCheckpoint ;
39
27
import org .opensearch .indices .replication .checkpoint .SegmentReplicationCheckpointPublisher ;
40
28
import org .opensearch .threadpool .ThreadPool ;
41
29
42
30
import java .io .IOException ;
43
31
import java .util .Collection ;
44
32
import java .util .HashMap ;
45
33
import java .util .Iterator ;
46
- import java .util .Locale ;
47
34
import java .util .Map ;
48
35
import java .util .Set ;
49
- import java .util .concurrent .CountDownLatch ;
50
- import java .util .concurrent .TimeUnit ;
51
- import java .util .concurrent .atomic .AtomicBoolean ;
52
- import java .util .stream .Collectors ;
53
-
54
- import static org .opensearch .index .seqno .SequenceNumbers .LOCAL_CHECKPOINT_KEY ;
55
36
56
37
/**
57
38
* RefreshListener implementation to upload newly created segment files to the remote store
@@ -120,8 +101,7 @@ public RemoteStoreRefreshListener(
120
101
}
121
102
// initializing primary term with the primary term of latest metadata in remote store.
122
103
// if no metadata is present, this value will be initialized with -1.
123
- this .primaryTerm = remoteSegmentMetadata != null ? remoteSegmentMetadata .getPrimaryTerm () :
124
- INVALID_PRIMARY_TERM ;
104
+ this .primaryTerm = remoteSegmentMetadata != null ? remoteSegmentMetadata .getPrimaryTerm () : INVALID_PRIMARY_TERM ;
125
105
this .segmentTracker = segmentTracker ;
126
106
resetBackOffDelayIterator ();
127
107
this .checkpointPublisher = checkpointPublisher ;
@@ -233,7 +213,7 @@ protected String getRetryThreadPoolName() {
233
213
return ThreadPool .Names .REMOTE_REFRESH_RETRY ;
234
214
}
235
215
236
- //todo: create a common class
216
+ // todo: create a common class
237
217
private boolean isRefreshAfterCommit () throws IOException {
238
218
String lastCommittedLocalSegmentFileName = SegmentInfos .getLastCommitSegmentsFileName (storeDirectory );
239
219
return (lastCommittedLocalSegmentFileName != null
@@ -253,7 +233,7 @@ private boolean isRefreshAfterCommitSafe() {
253
233
return false ;
254
234
}
255
235
256
- //todo
236
+ // todo
257
237
boolean isLowPriorityUpload () {
258
238
return isLocalOrSnapshotRecoveryOrSeeding ();
259
239
}
@@ -264,7 +244,7 @@ boolean isLowPriorityUpload() {
264
244
* @param file that needs to be uploaded.
265
245
* @return true if the upload has to be skipped for the file.
266
246
*/
267
- //todo: create a common class
247
+ // todo: create a common class
268
248
private boolean skipUpload (String file ) {
269
249
try {
270
250
// Exclude files that are already uploaded and the exclude files to come up with the list of files to be uploaded.
@@ -295,7 +275,7 @@ private String getChecksumOfLocalFile(String file) throws IOException {
295
275
*
296
276
* @return updated map of local segment files and filesize
297
277
*/
298
- //todo: create a common class
278
+ // todo: create a common class
299
279
private Map <String , Long > updateLocalSizeMapAndTracker (Collection <String > segmentFiles ) {
300
280
return segmentTracker .updateLatestLocalFileNameLengthMap (segmentFiles , storeDirectory ::fileLength );
301
281
}
@@ -305,7 +285,7 @@ private Map<String, Long> updateLocalSizeMapAndTracker(Collection<String> segmen
305
285
* returned value of this method for scheduling retries in syncSegments method.
306
286
* @return true iff the shard is a started with primary mode true or it is local or snapshot recovery.
307
287
*/
308
- //todo: create a common class
288
+ // todo: create a common class
309
289
private boolean isReadyForUpload () {
310
290
boolean isReady = indexShard .isStartedPrimary () || isLocalOrSnapshotRecoveryOrSeeding ();
311
291
@@ -329,7 +309,7 @@ private boolean isReadyForUpload() {
329
309
return isReady ;
330
310
}
331
311
332
- //todo: create a common class
312
+ // todo: create a common class
333
313
boolean isLocalOrSnapshotRecoveryOrSeeding () {
334
314
// In this case when the primary mode is false, we need to upload segments to Remote Store
335
315
// This is required in case of remote migration seeding/snapshots/shrink/ split/clone where we need to durable persist
0 commit comments