|
105 | 105 | import java.util.concurrent.CountDownLatch;
|
106 | 106 | import java.util.concurrent.ExecutorService;
|
107 | 107 | import java.util.concurrent.Executors;
|
| 108 | +import java.util.concurrent.TimeUnit; |
108 | 109 | import java.util.concurrent.atomic.AtomicInteger;
|
109 | 110 |
|
110 | 111 | import static java.util.Collections.emptyMap;
|
@@ -798,15 +799,9 @@ private DirectoryReader getReader(IndexWriter writer, ShardId shardId) throws IO
|
798 | 799 |
|
799 | 800 | private IndicesRequestCache getIndicesRequestCache(Settings settings) {
|
800 | 801 | IndicesService indicesService = getInstanceFromNode(IndicesService.class);
|
801 |
| - return new IndicesRequestCache(settings, (shardId -> { |
802 |
| - IndexService indexService = null; |
803 |
| - try { |
804 |
| - indexService = indicesService.indexServiceSafe(shardId.getIndex()); |
805 |
| - } catch (IndexNotFoundException ex) { |
806 |
| - return Optional.empty(); |
807 |
| - } |
808 |
| - return Optional.of(new IndicesService.IndexShardCacheEntity(indexService.getShard(shardId.id()))); |
809 |
| - }), |
| 802 | + return new IndicesRequestCache( |
| 803 | + settings, |
| 804 | + indicesService.indicesRequestCache.cacheEntityLookup, |
810 | 805 | new CacheModule(new ArrayList<>(), Settings.EMPTY).getCacheService(),
|
811 | 806 | threadPool,
|
812 | 807 | ClusterServiceUtils.createClusterService(threadPool)
|
@@ -1419,6 +1414,55 @@ public void testDeleteAndCreateIndexShardOnSameNodeAndVerifyStats() throws Excep
|
1419 | 1414 | IOUtils.close(reader, writer, dir, cache);
|
1420 | 1415 | }
|
1421 | 1416 |
|
| 1417 | + public void testIndexShardClosedAndVerifyCacheCleanUpWorksSuccessfully() throws Exception { |
| 1418 | + threadPool = getThreadPool(); |
| 1419 | + String indexName = "test1"; |
| 1420 | + // Create a shard |
| 1421 | + IndexService indexService = createIndex( |
| 1422 | + indexName, |
| 1423 | + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build() |
| 1424 | + ); |
| 1425 | + IndexShard indexShard = indexService.getShard(0); |
| 1426 | + Directory dir = newDirectory(); |
| 1427 | + IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig()); |
| 1428 | + writer.addDocument(newDoc(0, "foo")); |
| 1429 | + writer.addDocument(newDoc(1, "hack")); |
| 1430 | + DirectoryReader reader = OpenSearchDirectoryReader.wrap(DirectoryReader.open(writer), indexShard.shardId()); |
| 1431 | + Loader loader = new Loader(reader, 0); |
| 1432 | + |
| 1433 | + // Set clean interval to a high value as we will do it manually here. |
| 1434 | + IndicesRequestCache cache = getIndicesRequestCache( |
| 1435 | + Settings.builder() |
| 1436 | + .put(IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING_KEY, TimeValue.timeValueMillis(100000)) |
| 1437 | + .build() |
| 1438 | + ); |
| 1439 | + IndicesService.IndexShardCacheEntity cacheEntity = new IndicesService.IndexShardCacheEntity(indexShard); |
| 1440 | + TermQueryBuilder termQuery = new TermQueryBuilder("id", "bar"); |
| 1441 | + |
| 1442 | + // Cache some values for indexShard |
| 1443 | + BytesReference value = cache.getOrCompute(cacheEntity, loader, reader, getTermBytes()); |
| 1444 | + |
| 1445 | + // Verify response and stats. |
| 1446 | + assertEquals("foo", value.streamInput().readString()); |
| 1447 | + RequestCacheStats stats = indexShard.requestCache().stats(); |
| 1448 | + assertEquals("foo", value.streamInput().readString()); |
| 1449 | + assertEquals(1, cache.count()); |
| 1450 | + assertEquals(1, stats.getMissCount()); |
| 1451 | + assertTrue(stats.getMemorySizeInBytes() > 0); |
| 1452 | + |
| 1453 | + // Remove the shard making its cache entries stale |
| 1454 | + IOUtils.close(reader, writer, dir); |
| 1455 | + indexService.removeShard(0, "force"); |
| 1456 | + |
| 1457 | + assertBusy(() -> { assertEquals(IndexShardState.CLOSED, indexShard.state()); }, 1, TimeUnit.SECONDS); |
| 1458 | + |
| 1459 | + // Trigger clean up of cache. Should not throw any exception. |
| 1460 | + cache.cacheCleanupManager.cleanCache(); |
| 1461 | + // Verify all cleared up. |
| 1462 | + assertEquals(0, cache.count()); |
| 1463 | + IOUtils.close(cache); |
| 1464 | + } |
| 1465 | + |
1422 | 1466 | public static String generateString(int length) {
|
1423 | 1467 | String characters = "abcdefghijklmnopqrstuvwxyz";
|
1424 | 1468 | StringBuilder sb = new StringBuilder(length);
|
|
0 commit comments