|
32 | 32 |
|
33 | 33 | package org.opensearch.index.reindex;
|
34 | 34 |
|
35 |
| -import org.opensearch.action.bulk.BulkRequestBuilder; |
36 |
| -import org.opensearch.action.bulk.BulkResponse; |
37 | 35 | import org.opensearch.action.index.IndexRequestBuilder;
|
38 |
| -import org.opensearch.action.search.SearchResponse; |
39 |
| -import org.opensearch.common.settings.Settings; |
40 |
| -import org.opensearch.common.xcontent.XContentType; |
41 |
| -import org.opensearch.search.SearchHit; |
42 |
| -import org.opensearch.search.sort.SortOrder; |
43 | 36 |
|
44 | 37 | import java.util.ArrayList;
|
45 | 38 | import java.util.Collection;
|
|
48 | 41 | import java.util.Map;
|
49 | 42 | import java.util.stream.Collectors;
|
50 | 43 |
|
51 |
| -import static org.opensearch.index.query.QueryBuilders.matchAllQuery; |
52 | 44 | import static org.opensearch.index.query.QueryBuilders.termQuery;
|
53 |
| -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; |
54 | 45 | import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
|
55 | 46 | import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
56 | 47 | import static org.hamcrest.Matchers.hasSize;
|
@@ -186,301 +177,4 @@ public void testMissingSources() {
|
186 | 177 | assertThat(response, matcher().created(0).slices(hasSize(0)));
|
187 | 178 | }
|
188 | 179 |
|
189 |
| - public void testReindexWithDerivedSource() throws Exception { |
190 |
| - // Create source index with derived source setting enabled |
191 |
| - String sourceIndexMapping = """ |
192 |
| - { |
193 |
| - "settings": { |
194 |
| - "index": { |
195 |
| - "number_of_shards": 1, |
196 |
| - "number_of_replicas": 0, |
197 |
| - "derived_source": { |
198 |
| - "enabled": true |
199 |
| - } |
200 |
| - } |
201 |
| - }, |
202 |
| - "mappings": { |
203 |
| - "_doc": { |
204 |
| - "properties": { |
205 |
| - "foo": { |
206 |
| - "type": "keyword", |
207 |
| - "store": true |
208 |
| - }, |
209 |
| - "bar": { |
210 |
| - "type": "integer", |
211 |
| - "store": true |
212 |
| - } |
213 |
| - } |
214 |
| - } |
215 |
| - } |
216 |
| - }"""; |
217 |
| - |
218 |
| - // Create indices |
219 |
| - assertAcked(prepareCreate("source_index").setSource(sourceIndexMapping, XContentType.JSON)); |
220 |
| - assertAcked(prepareCreate("dest_index").setSource(sourceIndexMapping, XContentType.JSON)); |
221 |
| - ensureGreen(); |
222 |
| - |
223 |
| - // Index some documents |
224 |
| - int numDocs = randomIntBetween(5, 20); |
225 |
| - List<IndexRequestBuilder> docs = new ArrayList<>(); |
226 |
| - for (int i = 0; i < numDocs; i++) { |
227 |
| - docs.add(client().prepareIndex("source_index").setId(Integer.toString(i)).setSource("foo", "value_" + i, "bar", i)); |
228 |
| - } |
229 |
| - indexRandom(true, docs); |
230 |
| - |
231 |
| - // Test 1: Basic reindex |
232 |
| - ReindexRequestBuilder copy = reindex().source("source_index").destination("dest_index").refresh(true); |
233 |
| - |
234 |
| - BulkByScrollResponse response = copy.get(); |
235 |
| - assertThat(response, matcher().created(numDocs)); |
236 |
| - long expectedCount = client().prepareSearch("dest_index").setQuery(matchAllQuery()).get().getHits().getTotalHits().value(); |
237 |
| - assertEquals(numDocs, expectedCount); |
238 |
| - |
239 |
| - // Test 2: Reindex with query filter |
240 |
| - String destIndexFiltered = "dest_index_filtered"; |
241 |
| - assertAcked(prepareCreate(destIndexFiltered).setSource(sourceIndexMapping, XContentType.JSON)); |
242 |
| - |
243 |
| - copy = reindex().source("source_index").destination(destIndexFiltered).filter(termQuery("bar", 1)).refresh(true); |
244 |
| - |
245 |
| - response = copy.get(); |
246 |
| - expectedCount = client().prepareSearch("source_index").setQuery(termQuery("bar", 1)).get().getHits().getTotalHits().value(); |
247 |
| - assertThat(response, matcher().created(expectedCount)); |
248 |
| - |
249 |
| - // Test 3: Reindex with slices |
250 |
| - String destIndexSliced = "dest_index_sliced"; |
251 |
| - assertAcked(prepareCreate(destIndexSliced).setSource(sourceIndexMapping, XContentType.JSON)); |
252 |
| - |
253 |
| - int slices = randomSlices(); |
254 |
| - int expectedSlices = expectedSliceStatuses(slices, "source_index"); |
255 |
| - |
256 |
| - copy = reindex().source("source_index").destination(destIndexSliced).setSlices(slices).refresh(true); |
257 |
| - |
258 |
| - response = copy.get(); |
259 |
| - assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices))); |
260 |
| - |
261 |
| - // Test 4: Reindex with maxDocs |
262 |
| - String destIndexMaxDocs = "dest_index_maxdocs"; |
263 |
| - assertAcked(prepareCreate(destIndexMaxDocs).setSource(sourceIndexMapping, XContentType.JSON)); |
264 |
| - |
265 |
| - int maxDocs = numDocs / 2; |
266 |
| - copy = reindex().source("source_index").destination(destIndexMaxDocs).maxDocs(maxDocs).refresh(true); |
267 |
| - |
268 |
| - response = copy.get(); |
269 |
| - assertThat(response, matcher().created(maxDocs)); |
270 |
| - expectedCount = client().prepareSearch(destIndexMaxDocs).setQuery(matchAllQuery()).get().getHits().getTotalHits().value(); |
271 |
| - assertEquals(maxDocs, expectedCount); |
272 |
| - |
273 |
| - // Test 5: Multiple source indices |
274 |
| - String sourceIndex2 = "source_index_2"; |
275 |
| - assertAcked(prepareCreate(sourceIndex2).setSource(sourceIndexMapping, XContentType.JSON)); |
276 |
| - |
277 |
| - int numDocs2 = randomIntBetween(5, 20); |
278 |
| - List<IndexRequestBuilder> docs2 = new ArrayList<>(); |
279 |
| - for (int i = 0; i < numDocs2; i++) { |
280 |
| - docs2.add( |
281 |
| - client().prepareIndex(sourceIndex2).setId(Integer.toString(i + numDocs)).setSource("foo", "value2_" + i, "bar", i + numDocs) |
282 |
| - ); |
283 |
| - } |
284 |
| - indexRandom(true, docs2); |
285 |
| - |
286 |
| - String destIndexMulti = "dest_index_multi"; |
287 |
| - assertAcked(prepareCreate(destIndexMulti).setSource(sourceIndexMapping, XContentType.JSON)); |
288 |
| - |
289 |
| - copy = reindex().source("source_index", "source_index_2").destination(destIndexMulti).refresh(true); |
290 |
| - |
291 |
| - response = copy.get(); |
292 |
| - assertThat(response, matcher().created(numDocs + numDocs2)); |
293 |
| - expectedCount = client().prepareSearch(destIndexMulti).setQuery(matchAllQuery()).get().getHits().getTotalHits().value(); |
294 |
| - assertEquals(numDocs + numDocs2, expectedCount); |
295 |
| - } |
296 |
| - |
297 |
| - public void testReindexFromDerivedSourceToNormalIndex() throws Exception { |
298 |
| - // Create source index with derived source enabled |
299 |
| - String sourceMapping = """ |
300 |
| - { |
301 |
| - "properties": { |
302 |
| - "text_field": { |
303 |
| - "type": "text", |
304 |
| - "store": true |
305 |
| - }, |
306 |
| - "keyword_field": { |
307 |
| - "type": "keyword" |
308 |
| - }, |
309 |
| - "numeric_field": { |
310 |
| - "type": "long", |
311 |
| - "doc_values": true |
312 |
| - }, |
313 |
| - "date_field": { |
314 |
| - "type": "date", |
315 |
| - "store": true |
316 |
| - } |
317 |
| - } |
318 |
| - }"""; |
319 |
| - |
320 |
| - // Create destination index with normal settings |
321 |
| - String destMapping = """ |
322 |
| - { |
323 |
| - "properties": { |
324 |
| - "text_field": { |
325 |
| - "type": "text" |
326 |
| - }, |
327 |
| - "keyword_field": { |
328 |
| - "type": "keyword" |
329 |
| - }, |
330 |
| - "numeric_field": { |
331 |
| - "type": "long" |
332 |
| - }, |
333 |
| - "date_field": { |
334 |
| - "type": "date" |
335 |
| - } |
336 |
| - } |
337 |
| - }"""; |
338 |
| - |
339 |
| - // Create source index |
340 |
| - assertAcked( |
341 |
| - prepareCreate("source_index").setSettings( |
342 |
| - Settings.builder().put("index.number_of_shards", 2).put("index.derived_source.enabled", true) |
343 |
| - ).setMapping(sourceMapping) |
344 |
| - ); |
345 |
| - |
346 |
| - // Create destination index |
347 |
| - assertAcked(prepareCreate("dest_index").setMapping(destMapping)); |
348 |
| - |
349 |
| - // Index test documents |
350 |
| - int numDocs = randomIntBetween(100, 200); |
351 |
| - final List<IndexRequestBuilder> docs = new ArrayList<>(); |
352 |
| - for (int i = 0; i < numDocs; i++) { |
353 |
| - docs.add( |
354 |
| - client().prepareIndex("source_index") |
355 |
| - .setId(Integer.toString(i)) |
356 |
| - .setSource( |
357 |
| - "text_field", |
358 |
| - "text value " + i, |
359 |
| - "keyword_field", |
360 |
| - "key_" + i, |
361 |
| - "numeric_field", |
362 |
| - i, |
363 |
| - "date_field", |
364 |
| - System.currentTimeMillis() |
365 |
| - ) |
366 |
| - ); |
367 |
| - } |
368 |
| - indexRandom(true, docs); |
369 |
| - refresh("source_index"); |
370 |
| - |
371 |
| - // Test 1: Basic reindex without slices |
372 |
| - ReindexRequestBuilder reindex = reindex().source("source_index").destination("dest_index").refresh(true); |
373 |
| - BulkByScrollResponse response = reindex.get(); |
374 |
| - assertThat(response, matcher().created(numDocs)); |
375 |
| - verifyReindexedContent("dest_index", numDocs); |
376 |
| - |
377 |
| - // Test 2: Reindex with query filter |
378 |
| - String destFilteredIndex = "dest_filtered_index"; |
379 |
| - assertAcked(prepareCreate(destFilteredIndex).setMapping(destMapping)); |
380 |
| - reindex = reindex().source("source_index").destination(destFilteredIndex).filter(termQuery("keyword_field", "key_1")).refresh(true); |
381 |
| - response = reindex.get(); |
382 |
| - assertThat(response, matcher().created(1)); |
383 |
| - verifyReindexedContent(destFilteredIndex, 1); |
384 |
| - |
385 |
| - // Test 3: Reindex with slices |
386 |
| - String destSlicedIndex = "dest_sliced_index"; |
387 |
| - assertAcked(prepareCreate(destSlicedIndex).setMapping(destMapping)); |
388 |
| - int slices = randomSlices(); |
389 |
| - int expectedSlices = expectedSliceStatuses(slices, "source_index"); |
390 |
| - |
391 |
| - reindex = reindex().source("source_index").destination(destSlicedIndex).setSlices(slices).refresh(true); |
392 |
| - response = reindex.get(); |
393 |
| - assertThat(response, matcher().created(numDocs).slices(hasSize(expectedSlices))); |
394 |
| - verifyReindexedContent(destSlicedIndex, numDocs); |
395 |
| - |
396 |
| - // Test 4: Reindex with field transformation |
397 |
| - String destTransformedIndex = "dest_transformed_index"; |
398 |
| - String transformedMapping = """ |
399 |
| - { |
400 |
| - "properties": { |
401 |
| - "new_text_field": { |
402 |
| - "type": "text" |
403 |
| - }, |
404 |
| - "new_keyword_field": { |
405 |
| - "type": "keyword" |
406 |
| - }, |
407 |
| - "modified_numeric": { |
408 |
| - "type": "long" |
409 |
| - }, |
410 |
| - "date_field": { |
411 |
| - "type": "date" |
412 |
| - } |
413 |
| - } |
414 |
| - }"""; |
415 |
| - assertAcked(prepareCreate(destTransformedIndex).setMapping(transformedMapping)); |
416 |
| - |
417 |
| - // First reindex the documents |
418 |
| - reindex = reindex().source("source_index").destination(destTransformedIndex).refresh(true); |
419 |
| - response = reindex.get(); |
420 |
| - assertThat(response, matcher().created(numDocs)); |
421 |
| - |
422 |
| - // Then transform using bulk update |
423 |
| - BulkRequestBuilder bulkRequest = client().prepareBulk(); |
424 |
| - SearchResponse searchResponse = client().prepareSearch(destTransformedIndex).setQuery(matchAllQuery()).setSize(numDocs).get(); |
425 |
| - |
426 |
| - for (SearchHit hit : searchResponse.getHits()) { |
427 |
| - Map<String, Object> source = hit.getSourceAsMap(); |
428 |
| - Map<String, Object> newSource = new HashMap<>(); |
429 |
| - |
430 |
| - // Transform fields |
431 |
| - newSource.put("new_text_field", source.get("text_field")); |
432 |
| - newSource.put("new_keyword_field", source.get("keyword_field")); |
433 |
| - newSource.put("modified_numeric", ((Number) source.get("numeric_field")).longValue() + 1000); |
434 |
| - newSource.put("date_field", source.get("date_field")); |
435 |
| - |
436 |
| - bulkRequest.add(client().prepareIndex(destTransformedIndex).setId(hit.getId()).setSource(newSource)); |
437 |
| - } |
438 |
| - |
439 |
| - BulkResponse bulkResponse = bulkRequest.get(); |
440 |
| - assertFalse(bulkResponse.hasFailures()); |
441 |
| - refresh(destTransformedIndex); |
442 |
| - verifyTransformedContent(destTransformedIndex, numDocs); |
443 |
| - } |
444 |
| - |
445 |
| - private void verifyReindexedContent(String indexName, int expectedCount) { |
446 |
| - refresh(indexName); |
447 |
| - SearchResponse searchResponse = client().prepareSearch(indexName) |
448 |
| - .setQuery(matchAllQuery()) |
449 |
| - .setSize(expectedCount) |
450 |
| - .addSort("numeric_field", SortOrder.ASC) |
451 |
| - .get(); |
452 |
| - |
453 |
| - assertHitCount(searchResponse, expectedCount); |
454 |
| - |
455 |
| - for (SearchHit hit : searchResponse.getHits()) { |
456 |
| - Map<String, Object> source = hit.getSourceAsMap(); |
457 |
| - int id = Integer.parseInt(hit.getId()); |
458 |
| - |
459 |
| - assertEquals("text value " + id, source.get("text_field")); |
460 |
| - assertEquals("key_" + id, source.get("keyword_field")); |
461 |
| - assertEquals(id, ((Number) source.get("numeric_field")).intValue()); |
462 |
| - assertNotNull(source.get("date_field")); |
463 |
| - } |
464 |
| - } |
465 |
| - |
466 |
| - private void verifyTransformedContent(String indexName, int expectedCount) { |
467 |
| - refresh(indexName); |
468 |
| - SearchResponse searchResponse = client().prepareSearch(indexName) |
469 |
| - .setQuery(matchAllQuery()) |
470 |
| - .setSize(expectedCount) |
471 |
| - .addSort("modified_numeric", SortOrder.ASC) |
472 |
| - .get(); |
473 |
| - |
474 |
| - assertHitCount(searchResponse, expectedCount); |
475 |
| - |
476 |
| - for (SearchHit hit : searchResponse.getHits()) { |
477 |
| - Map<String, Object> source = hit.getSourceAsMap(); |
478 |
| - int id = Integer.parseInt(hit.getId()); |
479 |
| - |
480 |
| - assertEquals("text value " + id, source.get("new_text_field")); |
481 |
| - assertEquals("key_" + id, source.get("new_keyword_field")); |
482 |
| - assertEquals(id + 1000, ((Number) source.get("modified_numeric")).longValue()); |
483 |
| - assertNotNull(source.get("date_field")); |
484 |
| - } |
485 |
| - } |
486 | 180 | }
|
0 commit comments