diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e2c1ee1a7677..151b1b09086ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818)) - Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189)) - Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908)) +- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817))) ### Changed diff --git a/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java b/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java index 10bd16fcedf83..d26a8f28890ae 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/FinalPipelineIT.java @@ -111,7 +111,7 @@ public void testFinalPipelineCantChangeDestination() { IllegalStateException.class, () -> client().prepareIndex("index").setId("1").setSource(Collections.singletonMap("field", "value")).get() ); - assertThat(e, hasToString(containsString("final pipeline [final_pipeline] can't change the target index"))); + assertThat(e, hasToString(containsString("FINAL pipeline [final_pipeline] can't change the target index"))); } public void testFinalPipelineOfOldDestinationIsNotInvoked() { diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 0cb5a7451dc3a..d4454fc39bb57 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -251,7 +251,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec IndexRequest indexRequest = getIndexWriteRequest(actionRequest); if (indexRequest != null) { // Each index request needs to be evaluated, because this method also modifies the IndexRequest - boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata); + boolean indexRequestHasPipeline = ingestService.resolvePipelines(actionRequest, indexRequest, metadata); hasIndexRequestsWithPipelines |= indexRequestHasPipeline; } diff --git a/server/src/main/java/org/opensearch/action/index/IndexRequest.java b/server/src/main/java/org/opensearch/action/index/IndexRequest.java index 7bff66f4e860e..5c97b625b5269 100644 --- a/server/src/main/java/org/opensearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/opensearch/action/index/IndexRequest.java @@ -123,6 +123,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private String pipeline; private String finalPipeline; + private String systemIngestPipeline; private boolean isPipelineResolved; @@ -158,6 +159,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); finalPipeline = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_3_1_0)) { + systemIngestPipeline = in.readOptionalString(); + } isPipelineResolved = in.readBoolean(); isRetry = in.readBoolean(); autoGeneratedTimestamp = in.readLong(); @@ -314,6 +318,21 @@ public String getPipeline() { return this.pipeline; } + /** + * Sets the system ingest pipeline to be executed before indexing the document + */ + public IndexRequest setSystemIngestPipeline(final String systemIngestPipeline) { + this.systemIngestPipeline = systemIngestPipeline; + return this; + } + + /** + * Returns the system ingest pipeline to be executed before indexing the document + */ + public String getSystemIngestPipeline() { + return this.systemIngestPipeline; + } + /** * Sets the final ingest pipeline to be executed before indexing the document. * @@ -668,6 +687,9 @@ private void writeBody(StreamOutput out) throws IOException { out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); out.writeOptionalString(finalPipeline); + if (out.getVersion().onOrAfter(Version.V_3_1_0)) { + out.writeOptionalString(systemIngestPipeline); + } out.writeBoolean(isPipelineResolved); out.writeBoolean(isRetry); out.writeLong(autoGeneratedTimestamp); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 54b276237de28..5cb856147c206 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -413,6 +413,7 @@ public void apply(Settings value, Settings current, Settings previous) { ClusterService.USER_DEFINED_METADATA, ClusterManagerService.CLUSTER_MANAGER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS, + IngestService.SYSTEM_INGEST_PIPELINE_ENABLED, SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING, SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, diff --git a/server/src/main/java/org/opensearch/ingest/AbstractBatchingSystemProcessor.java b/server/src/main/java/org/opensearch/ingest/AbstractBatchingSystemProcessor.java new file mode 100644 index 0000000000000..385262a302298 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/AbstractBatchingSystemProcessor.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import java.util.Map; + +/** + * Abstract base class for batch system generated processors. + * + * System processors should not be used in the regular ingest pipelines. + * + * @opensearch.internal + */ +public abstract class AbstractBatchingSystemProcessor extends AbstractBatchingProcessor { + protected AbstractBatchingSystemProcessor(String tag, String description, int batchSize) { + super(tag, description, batchSize); + } + + @Override + public boolean isSystemGenerated() { + return true; + } + + /** + * Factory class for creating {@link AbstractBatchingSystemProcessor} instances systematically. + * + * Since the processor config is generated based on the index config so the batch size info should also be defined + * as part of it. And different processors can have their own logic to decide the batch size so let each + * implementation of the newProcessor to handle it. + * + * @opensearch.internal + */ + public abstract static class Factory implements Processor.Factory { + final String processorType; + + protected Factory(String processorType) { + this.processorType = processorType; + } + + @Override + public boolean isSystemGenerated() { + return true; + } + + /** + * Creates a new processor instance. It will be invoked systematically. + * + * @param processorFactories The processor factories. + * @param tag The processor tag. + * @param description The processor description. + * @param config The processor configuration. + * @return The new AbstractBatchProcessor instance. + * @throws Exception If the processor could not be created. + */ + @Override + public AbstractBatchingSystemProcessor create( + Map processorFactories, + String tag, + String description, + Map config + ) throws Exception { + return newProcessor(tag, description, config); + } + + /** + * Returns a new processor instance. It will be invoked systematically. + * + * @param tag tag of the processor + * @param description description of the processor + * @param config configuration of the processor + * @return a new batch processor instance + */ + protected abstract AbstractBatchingSystemProcessor newProcessor(String tag, String description, Map config); + } +} diff --git a/server/src/main/java/org/opensearch/ingest/IndexRequestWrapper.java b/server/src/main/java/org/opensearch/ingest/IndexRequestWrapper.java new file mode 100644 index 0000000000000..76b85d3b37494 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/IndexRequestWrapper.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.index.IndexRequest; + +import java.util.List; + +/** + * A wrapper for the index request to help execute the ingest pipelines. + */ +public class IndexRequestWrapper { + /** + * slot of the IndexRequestWrapper is the index of the request in the list of the requests. + * It can be used to map the ingested result or exception to right index request. + */ + private final int slot; + private final IndexRequest indexRequest; + private final DocWriteRequest actionRequest; + private final List pipelineInfoList; + + public IndexRequestWrapper( + int slot, + IndexRequest indexRequest, + DocWriteRequest actionRequest, + List pipelineInfoList + ) { + this.slot = slot; + this.indexRequest = indexRequest; + this.actionRequest = actionRequest; + this.pipelineInfoList = pipelineInfoList; + } + + public int getSlot() { + return slot; + } + + public IndexRequest getIndexRequest() { + return indexRequest; + } + + public DocWriteRequest getActionRequest() { + return actionRequest; + } + + public List getIngestPipelineInfoList() { + return pipelineInfoList; + } +} diff --git a/server/src/main/java/org/opensearch/ingest/IngestPipelineInfo.java b/server/src/main/java/org/opensearch/ingest/IngestPipelineInfo.java new file mode 100644 index 0000000000000..4d8a47a417cc1 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/IngestPipelineInfo.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import reactor.util.annotation.NonNull; + +/** + * Ingest pipeline info help hold the pipeline id and type. + */ +public class IngestPipelineInfo { + private final String pipelineId; + private final IngestPipelineType type; + + public IngestPipelineInfo(final @NonNull String pipelineId, final @NonNull IngestPipelineType type) { + this.pipelineId = pipelineId; + this.type = type; + } + + public String getPipelineId() { + return pipelineId; + } + + public IngestPipelineType getType() { + return type; + } + + @Override + public String toString() { + return pipelineId + ":" + type.name(); + } +} diff --git a/server/src/main/java/org/opensearch/ingest/IngestPipelineType.java b/server/src/main/java/org/opensearch/ingest/IngestPipelineType.java new file mode 100644 index 0000000000000..848eea02af1b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/IngestPipelineType.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +/** + * An enum for the ingest pipeline type + */ +public enum IngestPipelineType { + /** + * Default pipeline is the pipeline provided through the index request or defined in + * the index settings as the default pipeline. + */ + DEFAULT, + /** + * Final pipeline is the one defined in the index settings as the final pipeline. + */ + FINAL, + /** + * System final pipeline is a systematically generated pipeline which will be executed after the + * user defined final pipeline. + */ + SYSTEM_FINAL +} diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index bc7afa1a7589e..4704a6029352c 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -52,12 +52,14 @@ import org.opensearch.cluster.metadata.IndexAbstraction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; +import org.opensearch.cluster.metadata.MappingMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.common.regex.Regex; import org.opensearch.common.settings.Setting; @@ -67,19 +69,21 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.action.ActionListener; import org.opensearch.core.service.ReportingService; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.gateway.GatewayService; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; import org.opensearch.index.analysis.AnalysisRegistry; +import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.IndicesService; import org.opensearch.plugins.IngestPlugin; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; +import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -97,8 +101,12 @@ import java.util.function.IntConsumer; import java.util.stream.Collectors; +import reactor.util.annotation.NonNull; + import static org.opensearch.cluster.service.ClusterManagerTask.DELETE_PIPELINE; import static org.opensearch.cluster.service.ClusterManagerTask.PUT_PIPELINE; +import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_MAPPINGS; +import static org.opensearch.plugins.IngestPlugin.SystemIngestPipelineConfigKeys.INDEX_TEMPLATE_MAPPINGS; /** * Holder class for several ingest related services. @@ -113,7 +121,7 @@ public class IngestService implements ClusterStateApplier, ReportingService MAX_NUMBER_OF_INGEST_PROCESSORS = Setting.intSetting( "cluster.ingest.max_number_processors", @@ -124,11 +132,22 @@ public class IngestService implements ClusterStateApplier, ReportingService SYSTEM_INGEST_PIPELINE_ENABLED = Setting.boolSetting( + "cluster.ingest.system_pipeline_enabled", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private static final Logger logger = LogManager.getLogger(IngestService.class); private final ClusterService clusterService; private final ScriptService scriptService; private final Map processorFactories; + private Map systemIngestProcessorFactories = null; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some // processor factories rely on other node services. Custom metadata is statically registered when classes @@ -141,6 +160,9 @@ public class IngestService implements ClusterStateApplier, ReportingService ingestPlugins, Client client, - IndicesService indicesService + IndicesService indicesService, + NamedXContentRegistry xContentRegistry, + SystemIngestPipelineCache systemIngestPipelineCache ) { this.clusterService = clusterService; this.scriptService = scriptService; - this.processorFactories = processorFactories( - ingestPlugins, - new Processor.Parameters( - env, - scriptService, - analysisRegistry, - threadPool.getThreadContext(), - threadPool::relativeTimeInMillis, - (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), - this, - client, - threadPool.generic()::execute, - indicesService - ) + this.xContentRegistry = xContentRegistry; + final Processor.Parameters processorParameters = new Processor.Parameters( + env, + scriptService, + analysisRegistry, + threadPool.getThreadContext(), + threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC), + this, + client, + threadPool.generic()::execute, + indicesService ); + this.processorFactories = processorFactories(ingestPlugins, processorParameters); + this.systemIngestProcessorFactories = systemProcessorFactories(ingestPlugins, processorParameters); + this.systemIngestPipelineCache = systemIngestPipelineCache; this.threadPool = threadPool; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. putPipelineTaskKey = clusterService.registerClusterManagerTask(PUT_PIPELINE, true); deletePipelineTaskKey = clusterService.registerClusterManagerTask(DELETE_PIPELINE, true); clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_NUMBER_OF_INGEST_PROCESSORS, this::setMaxIngestProcessorCount); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(SYSTEM_INGEST_PIPELINE_ENABLED, this::setIsSystemIngestPipelineEnabled); setMaxIngestProcessorCount(clusterService.getClusterSettings().get(MAX_NUMBER_OF_INGEST_PROCESSORS)); + setIsSystemIngestPipelineEnabled(clusterService.getClusterSettings().get(SYSTEM_INGEST_PIPELINE_ENABLED)); } private void setMaxIngestProcessorCount(Integer maxIngestProcessorCount) { this.maxIngestProcessorCount = maxIngestProcessorCount; } + private void setIsSystemIngestPipelineEnabled(final boolean isSystemIngestPipelineEnabled) { + this.isSystemIngestPipelineEnabled = isSystemIngestPipelineEnabled; + } + private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { @@ -195,47 +226,88 @@ private static Map processorFactories(List systemProcessorFactories( + List ingestPlugins, + Processor.Parameters parameters + ) { + Map processorFactories = new HashMap<>(); + for (IngestPlugin ingestPlugin : ingestPlugins) { + Map newProcessors = ingestPlugin.getSystemIngestProcessors(parameters); + for (Map.Entry entry : newProcessors.entrySet()) { + Processor.Factory processorFactory = entry.getValue(); + if (processorFactory.isSystemGenerated() == false) { + throw new RuntimeException("[" + entry.getKey() + "] is not a system generated processor factory."); + } + if (processorFactories.put(entry.getKey(), entry.getValue()) != null) { + throw new RuntimeException("System ingest processor [" + entry.getKey() + "] is already registered"); + } + } + } + return Collections.unmodifiableMap(processorFactories); + } + + /** + * Resolve the system ingest pipeline for the index request + * + * @param originalRequest + * @param indexRequest + * @param metadata + */ + private String resolveSystemIngestPipeline( final DocWriteRequest originalRequest, final IndexRequest indexRequest, final Metadata metadata ) { + String systemIngestPipelineId = null; + final IndexMetadata indexMetadata = getIndexMetadata(originalRequest, indexRequest, metadata); + if (indexMetadata != null) { + systemIngestPipelineId = getSystemIngestPipelineForExistingIndex(indexMetadata, indexRequest); + } else if (indexRequest.index() != null) { + // the index does not exist yet (and this is a valid request), so match index + // templates to look for pipelines in either a matching V2 template (which takes + // precedence), or if a V2 template does not match, any V1 templates + String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false); + if (v2Template != null) { + systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest); + } else { + List templates = MetadataIndexTemplateService.findV1Templates(metadata, indexRequest.index(), null); + systemIngestPipelineId = getSystemIngestPipelineForTemplateV1(templates, indexRequest); + } + } + indexRequest.setSystemIngestPipeline(systemIngestPipelineId != null ? systemIngestPipelineId : NOOP_PIPELINE_NAME); + return systemIngestPipelineId; + } + + /** + * Resolve the ingest pipeline, final ingest pipeline and system ingest pipeline for the request. + * @param originalRequest + * @param indexRequest + * @param metadata + * @return If the index request has an ingest pipeline or not. + */ + public boolean resolvePipelines(final DocWriteRequest originalRequest, final IndexRequest indexRequest, final Metadata metadata) { if (indexRequest.isPipelineResolved() == false) { final String requestPipeline = indexRequest.getPipeline(); - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + String defaultPipeline = null; String finalPipeline = null; - IndexMetadata indexMetadata = null; - // start to look for default or final pipelines via settings found in the index meta data - if (originalRequest != null) { - indexMetadata = metadata.indices().get(originalRequest.index()); - } - // check the alias for the index request (this is how normal index requests are modeled) - if (indexMetadata == null && indexRequest.index() != null) { - IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index()); - if (indexAbstraction != null) { - indexMetadata = indexAbstraction.getWriteIndex(); - } - } - // check the alias for the action request (this is how upserts are modeled) - if (indexMetadata == null && originalRequest != null && originalRequest.index() != null) { - IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(originalRequest.index()); - if (indexAbstraction != null) { - indexMetadata = indexAbstraction.getWriteIndex(); - } - } + String systemIngestPipelineId = null; + + final IndexMetadata indexMetadata = getIndexMetadata(originalRequest, indexRequest, metadata); + if (indexMetadata != null) { final Settings indexSettings = indexMetadata.getSettings(); if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) { // find the default pipeline if one is defined from an existing index setting defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); - indexRequest.setPipeline(defaultPipeline); } if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) { // find the final pipeline if one is defined from an existing index setting finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings); - indexRequest.setFinalPipeline(finalPipeline); + } + + if (this.isSystemIngestPipelineEnabled) { + systemIngestPipelineId = getSystemIngestPipelineForExistingIndex(indexMetadata, indexRequest); } } else if (indexRequest.index() != null) { // the index does not exist yet (and this is a valid request), so match index @@ -246,14 +318,14 @@ public static boolean resolvePipelines( Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template); if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - // we can not break in case a lower-order template has a final pipeline that we need to collect } if (IndexSettings.FINAL_PIPELINE.exists(settings)) { finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings); - // we can not break in case a lower-order template has a default pipeline that we need to collect } - indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : NOOP_PIPELINE_NAME); - indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : NOOP_PIPELINE_NAME); + + if (this.isSystemIngestPipelineEnabled) { + systemIngestPipelineId = getSystemIngestPipelineForTemplateV2(v2Template, indexRequest); + } } else { List templates = MetadataIndexTemplateService.findV1Templates( metadata, @@ -276,17 +348,22 @@ public static boolean resolvePipelines( break; } } - indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : NOOP_PIPELINE_NAME); - indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : NOOP_PIPELINE_NAME); + if (this.isSystemIngestPipelineEnabled) { + systemIngestPipelineId = getSystemIngestPipelineForTemplateV1(templates, indexRequest); + } } } + indexRequest.setPipeline(defaultPipeline != null ? defaultPipeline : NOOP_PIPELINE_NAME); + indexRequest.setFinalPipeline(finalPipeline != null ? finalPipeline : NOOP_PIPELINE_NAME); + indexRequest.setSystemIngestPipeline(systemIngestPipelineId != null ? systemIngestPipelineId : NOOP_PIPELINE_NAME); + if (requestPipeline != null) { indexRequest.setPipeline(requestPipeline); } /* - * We have to track whether or not the pipeline for this request has already been resolved. It can happen that the + * We have to track whether the pipeline for this request has already been resolved. It can happen that the * pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request * has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have * already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we @@ -300,7 +377,133 @@ public static boolean resolvePipelines( // return whether this index request has a pipeline return NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false - || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; + || NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false + || NOOP_PIPELINE_NAME.equals(indexRequest.getSystemIngestPipeline()) == false; + } + + private IndexMetadata getIndexMetadata(DocWriteRequest originalRequest, IndexRequest indexRequest, Metadata metadata) { + IndexMetadata indexMetadata = null; + // start to look for default or final pipelines via settings found in the index meta data + if (originalRequest != null) { + indexMetadata = metadata.indices().get(originalRequest.index()); + } + // check the alias for the index request (this is how normal index requests are modeled) + if (indexMetadata == null && indexRequest.index() != null) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index()); + if (indexAbstraction != null) { + indexMetadata = indexAbstraction.getWriteIndex(); + } + } + // check the alias for the action request (this is how upserts are modeled) + if (indexMetadata == null && originalRequest != null && originalRequest.index() != null) { + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(originalRequest.index()); + if (indexAbstraction != null) { + indexMetadata = indexAbstraction.getWriteIndex(); + } + } + return indexMetadata; + } + + private String getSystemIngestPipelineForTemplateV1( + @NonNull final List templates, + @NonNull final IndexRequest indexRequest + ) { + // Here we cache it with index name + template as the suffix since currently we don't have the uuid. + // We need to cache it so that later during execution we can find it by indexId to reuse it. + final String indexId = createIndexIdWithTemplateSuffix(indexRequest.index()); + Pipeline ingestPipeline = systemIngestPipelineCache.getSystemIngestPipeline(indexId); + if (ingestPipeline == null) { + final List> mappingsMap = new ArrayList<>(); + final Map pipelineConfig = new HashMap<>(); + for (final IndexTemplateMetadata template : templates) { + if (template.mappings() != null) { + try { + mappingsMap.add(MapperService.parseMapping(xContentRegistry, template.mappings().string())); + } catch (IOException e) { + throw new RuntimeException( + "Failed to resolve system ingest pipeline due to failed to parse the mappings [" + + template.mappings().string() + + "] of the index template: " + + template.name(), + e + ); + } + + } + } + + pipelineConfig.put(INDEX_TEMPLATE_MAPPINGS, mappingsMap); + ingestPipeline = createSystemIngestPipeline(indexId, pipelineConfig); + } + + // we can get an empty pipeline from the cache + // we only set the pipeline in request when there is a processor + return ingestPipeline.getProcessors().isEmpty() ? null : indexId; + } + + private String getSystemIngestPipelineForTemplateV2(@NonNull final String templateName, @NonNull final IndexRequest indexRequest) { + // Here we cache it with index name + template as the suffix since currently we don't have the uuid. + // We need to cache it so that later during execution we can find it by indexId to reuse it. + final String indexId = createIndexIdWithTemplateSuffix(indexRequest.index()); + Pipeline ingestPipeline = systemIngestPipelineCache.getSystemIngestPipeline(indexId); + if (ingestPipeline == null) { + final List> mappingsMap = new ArrayList<>(); + final Map pipelineConfig = new HashMap<>(); + final List mappings; + try { + mappings = MetadataIndexTemplateService.collectMappings(state, templateName, indexRequest.index()); + } catch (Exception e) { + throw new RuntimeException( + "Failed to resolve system ingest pipeline due to not able to collect mappings for template: " + + templateName + + ". Root cause: " + + e.getMessage(), + e + ); + } + + for (final CompressedXContent mapping : mappings) { + try { + mappingsMap.add(MapperService.parseMapping(xContentRegistry, mapping.string())); + } catch (IOException e) { + throw new RuntimeException("Failed to parse the mappings [" + mapping + "] of the index template: " + templateName, e); + } + } + + pipelineConfig.put(INDEX_TEMPLATE_MAPPINGS, mappingsMap); + ingestPipeline = createSystemIngestPipeline(indexId, pipelineConfig); + } + + // we can get an empty pipeline from the cache + // we only set the pipeline in request when there is a processor + return ingestPipeline.getProcessors().isEmpty() ? null : indexId; + } + + private String createIndexIdWithTemplateSuffix(@NonNull final String indexName) { + return "[" + indexName + "/template]"; + } + + private Pipeline createSystemIngestPipeline(@NonNull final String indexId, @NonNull final Map pipelineConfig) { + final Pipeline pipeline = Pipeline.createSystemIngestPipeline(indexId, systemIngestProcessorFactories, pipelineConfig); + systemIngestPipelineCache.cachePipeline(indexId, pipeline, maxIngestProcessorCount); + return pipeline; + } + + private String getSystemIngestPipelineForExistingIndex(@NonNull final IndexMetadata indexMetadata, IndexRequest indexRequest) { + final String indexId = indexMetadata.getIndex().toString(); + Pipeline ingestPipeline = systemIngestPipelineCache.getSystemIngestPipeline(indexId); + if (ingestPipeline == null) { + // no cache we will try to resolve the ingest pipeline based on the index configuration + final MappingMetadata mappingMetadata = indexMetadata.mapping(); + final Map pipelineConfig = new HashMap<>(); + if (mappingMetadata != null) { + pipelineConfig.put(INDEX_MAPPINGS, mappingMetadata.getSourceAsMap()); + } + ingestPipeline = createSystemIngestPipeline(indexId, pipelineConfig); + } + // we can get an empty pipeline from the cache + // we only set the pipeline in request when there is a processor + return ingestPipeline.getProcessors().isEmpty() ? null : indexId; } public ClusterService getClusterService() { @@ -452,6 +655,14 @@ public Map getProcessorFactories() { return processorFactories; } + public Map getSystemProcessorFactories() { + return systemIngestProcessorFactories; + } + + protected SystemIngestPipelineCache getSystemIngestPipelineCache() { + return systemIngestPipelineCache; + } + @Override public IngestInfo info() { Map processorFactories = getProcessorFactories(); @@ -607,30 +818,37 @@ private void runBulkRequestInBatch( continue; } + // need to set pipeline of the request as NOOP_PIPELINE_NAME so that when we switch back to the write thread + // and invoke doInternalExecute of the TransportBulkAction we will not execute the pipeline again. final String pipelineId = indexRequest.getPipeline(); indexRequest.setPipeline(NOOP_PIPELINE_NAME); final String finalPipelineId = indexRequest.getFinalPipeline(); indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); - boolean hasFinalPipeline = true; - final List pipelines; - if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false - && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = Arrays.asList(pipelineId, finalPipelineId); - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { - pipelines = Collections.singletonList(pipelineId); - hasFinalPipeline = false; - } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { - pipelines = Collections.singletonList(finalPipelineId); - } else { + final String systemPipelineId = indexRequest.getSystemIngestPipeline(); + indexRequest.setSystemIngestPipeline(NOOP_PIPELINE_NAME); + + List pipelinesInfoList = new ArrayList<>(); + + if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false) { + pipelinesInfoList.add(new IngestPipelineInfo(pipelineId, IngestPipelineType.DEFAULT)); + } + if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { + pipelinesInfoList.add(new IngestPipelineInfo(finalPipelineId, IngestPipelineType.FINAL)); + } + + if (IngestService.NOOP_PIPELINE_NAME.equals(systemPipelineId) == false) { + pipelinesInfoList.add(new IngestPipelineInfo(systemPipelineId, IngestPipelineType.SYSTEM_FINAL)); + } + + if (pipelinesInfoList.isEmpty()) { if (counter.decrementAndGet() == 0) { onCompletion.accept(originalThread, null); } assert counter.get() >= 0; - i++; - continue; + } else { + indexRequestWrappers.add(new IndexRequestWrapper(i, indexRequest, actionRequest, pipelinesInfoList)); } - indexRequestWrappers.add(new IndexRequestWrapper(i, indexRequest, pipelines, hasFinalPipeline)); i++; } @@ -641,9 +859,9 @@ private void runBulkRequestInBatch( for (List batch : batches) { executePipelinesInBatchRequests( batch.stream().map(IndexRequestWrapper::getSlot).collect(Collectors.toList()), - batch.get(0).getPipelines().iterator(), - batch.get(0).isHasFinalPipeline(), + batch.get(0).getIngestPipelineInfoList().iterator(), batch.stream().map(IndexRequestWrapper::getIndexRequest).collect(Collectors.toList()), + batch.stream().map(IndexRequestWrapper::getActionRequest).collect(Collectors.toList()), onDropped, onFailure, counter, @@ -668,12 +886,15 @@ private void runBulkRequestInBatch( static List> prepareBatches(int batchSize, List indexRequestWrappers) { final Map> indexRequestsPerIndexAndPipelines = new HashMap<>(); for (IndexRequestWrapper indexRequestWrapper : indexRequestWrappers) { - // IndexRequests are grouped by their index + pipeline ids + // IndexRequests are grouped by their index + pipeline ids + pipeline type List indexAndPipelineIds = new ArrayList<>(); String index = indexRequestWrapper.getIndexRequest().index(); - List pipelines = indexRequestWrapper.getPipelines(); + List pipelineInfo = indexRequestWrapper.getIngestPipelineInfoList() + .stream() + .map((IngestPipelineInfo::toString)) + .toList(); indexAndPipelineIds.add(index); - indexAndPipelineIds.addAll(pipelines); + indexAndPipelineIds.addAll(pipelineInfo); int hashCode = indexAndPipelineIds.hashCode(); indexRequestsPerIndexAndPipelines.putIfAbsent(hashCode, new ArrayList<>()); indexRequestsPerIndexAndPipelines.get(hashCode).add(indexRequestWrapper); @@ -691,42 +912,11 @@ static List> prepareBatches(int batchSize, List pipelines; - private final boolean hasFinalPipeline; - - IndexRequestWrapper(int slot, IndexRequest indexRequest, List pipelines, boolean hasFinalPipeline) { - this.slot = slot; - this.indexRequest = indexRequest; - this.pipelines = pipelines; - this.hasFinalPipeline = hasFinalPipeline; - } - - public int getSlot() { - return slot; - } - - public IndexRequest getIndexRequest() { - return indexRequest; - } - - public List getPipelines() { - return pipelines; - } - - public boolean isHasFinalPipeline() { - return hasFinalPipeline; - } - } - private void executePipelinesInBatchRequests( final List slots, - final Iterator pipelineIterator, - final boolean hasFinalPipeline, + final Iterator pipelineInfoIterator, final List indexRequests, + final List> actionRequests, final IntConsumer onDropped, final BiConsumer onFailure, final AtomicInteger counter, @@ -736,9 +926,9 @@ private void executePipelinesInBatchRequests( if (indexRequests.size() == 1) { executePipelines( slots.get(0), - pipelineIterator, - hasFinalPipeline, + pipelineInfoIterator, indexRequests.get(0), + actionRequests.get(0), onDropped, onFailure, counter, @@ -747,209 +937,294 @@ private void executePipelinesInBatchRequests( ); return; } - while (pipelineIterator.hasNext()) { - final String pipelineId = pipelineIterator.next(); - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + + if (pipelineInfoIterator.hasNext() == false) { + throw new RuntimeException( + "Should not executePipelinesInBatchRequests when the pipeline iterator does not have next pipeline." + ); + } + + final IngestPipelineInfo pipelineInfo = pipelineInfoIterator.next(); + final String pipelineId = pipelineInfo.getPipelineId(); + final IngestPipelineType pipelineInfoType = pipelineInfo.getType(); + try { + // we will rely on the target index of the actionRequests and indexRequests to resolve the system + // pipeline for the request if the previously resolved one is invalidated. Since we group requests by + // the target index so we can simply use the first request. + final Pipeline pipeline = getPipeline(pipelineId, pipelineInfoType, actionRequests.get(0), indexRequests.get(0)); + + if (pipeline == null) { + // in a valid null case if this is the last pipeline we should complete the execution + if (pipelineInfoIterator.hasNext() == false) { + completeExecution(counter, onCompletion, originalThread, indexRequests.size()); } - Pipeline pipeline = holder.pipeline; - String originalIndex = indexRequests.get(0).indices()[0]; - Map slotIndexRequestMap = createSlotIndexRequestMap(slots, indexRequests); - innerBatchExecute(slots, indexRequests, pipeline, onDropped, results -> { - for (int i = 0; i < results.size(); ++i) { - if (results.get(i).getException() != null) { - IndexRequest indexRequest = slotIndexRequestMap.get(results.get(i).getSlot()); - logger.debug( - () -> new ParameterizedMessage( - "failed to execute pipeline [{}] for document [{}/{}]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - results.get(i).getException() - ); - onFailure.accept(results.get(i).getSlot(), results.get(i).getException()); - } + // do not execute the pipeline since it is a valid null + return; + } + + String originalIndex = indexRequests.get(0).indices()[0]; + Map slotIndexRequestMap = createSlotIndexRequestMap(slots, indexRequests); + innerBatchExecute(slots, indexRequests, pipeline, onDropped, results -> { + for (int i = 0; i < results.size(); ++i) { + if (results.get(i).getException() != null) { + IndexRequest indexRequest = slotIndexRequestMap.get(results.get(i).getSlot()); + logger.debug( + () -> new ParameterizedMessage( + "failed to execute pipeline [{}] for document [{}/{}]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + results.get(i).getException() + ); + onFailure.accept(results.get(i).getSlot(), results.get(i).getException()); } + } - Iterator newPipelineIterator = pipelineIterator; - boolean newHasFinalPipeline = hasFinalPipeline; - // indexRequests are grouped for the same index and same pipelines - String newIndex = indexRequests.get(0).indices()[0]; - - // handle index change case - if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && pipelineIterator.hasNext() == false) { - totalMetrics.failed(); - for (int slot : slots) { - onFailure.accept( - slot, - new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index") - ); - } + // indexRequests are grouped for the same index and same pipelines, but we can conditionally change + // the target index so need to check each request + boolean hasTargetIndexChanged = false; + boolean hasInvalidTargetIndexChange = false; + final List indexRequestsTargetIndexUnchanged = new ArrayList<>(); + final List> actionRequestsTargetIndexUnchanged = new ArrayList<>(); + + for (int i = 0; i < indexRequests.size(); ++i) { + final IndexRequest indexRequest = indexRequests.get(i); + if (Objects.equals(originalIndex, indexRequest.indices()[0]) == false) { + hasTargetIndexChanged = true; + if (IngestPipelineType.FINAL.equals(pipelineInfoType) || IngestPipelineType.SYSTEM_FINAL.equals(pipelineInfoType)) { + // invalid target index change we should fail the doc + hasInvalidTargetIndexChange = true; + onFailure.accept( + i, + new IllegalStateException(pipelineInfoType + " pipeline [" + pipelineId + "] can't change the target index") + ); } else { - // Drain old it so it's not looped over - pipelineIterator.forEachRemaining($ -> {}); - for (IndexRequest indexRequest : indexRequests) { - indexRequest.isPipelineResolved(false); - resolvePipelines(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newPipelineIterator = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newPipelineIterator = Collections.emptyIterator(); - } - } + resetPipelineAfterIndexChange(indexRequest); } + } else { + indexRequestsTargetIndexUnchanged.add(indexRequest); + actionRequestsTargetIndexUnchanged.add(actionRequests.get(i)); } + } - if (newPipelineIterator.hasNext()) { - executePipelinesInBatchRequests( - slots, - newPipelineIterator, - newHasFinalPipeline, - indexRequests, - onDropped, - onFailure, - counter, - onCompletion, - originalThread - ); - } else { - if (counter.addAndGet(-results.size()) == 0) { + // handle index change case + if (hasTargetIndexChanged == true) { + if (hasInvalidTargetIndexChange == true) { + totalMetrics.failed(); + } + if (indexRequestsTargetIndexUnchanged.isEmpty()) { + // Drain old it so it's not looped over. We will re-process the pipelines of the request with + // the new target index in TransportBulkAction. + pipelineInfoIterator.forEachRemaining($ -> {}); + } + + if (pipelineInfoIterator.hasNext() == true) { + // Since the requests with new target index will be processed in the next round we should + // update the counter before we continue to execute the next pipeline for the requests with + // target index unchanged. + if (counter.addAndGet(-(indexRequests.size() - indexRequestsTargetIndexUnchanged.size())) == 0) { onCompletion.accept(originalThread, null); + // If no more request with target index unchanged we should end here so that we don't + // try to execute the next pipeline with an empty list of the requests. + return; } assert counter.get() >= 0; } - }); - } catch (Exception e) { - StringBuilder documentLogBuilder = new StringBuilder(); - for (int i = 0; i < indexRequests.size(); ++i) { - IndexRequest indexRequest = indexRequests.get(i); - documentLogBuilder.append(indexRequest.index()); - documentLogBuilder.append("/"); - documentLogBuilder.append(indexRequest.id()); - if (i < indexRequests.size() - 1) { - documentLogBuilder.append(", "); - } - onFailure.accept(slots.get(i), e); } - logger.debug( - () -> new ParameterizedMessage( - "failed to execute pipeline [{}] for documents [{}]", - pipelineId, - documentLogBuilder.toString() - ), - e - ); - if (counter.addAndGet(-indexRequests.size()) == 0) { - onCompletion.accept(originalThread, null); + + if (pipelineInfoIterator.hasNext()) { + executePipelinesInBatchRequests( + slots, + pipelineInfoIterator, + indexRequestsTargetIndexUnchanged, + actionRequestsTargetIndexUnchanged, + onDropped, + onFailure, + counter, + onCompletion, + originalThread + ); + } else { + completeExecution(counter, onCompletion, originalThread, results.size()); } - assert counter.get() >= 0; - break; + }); + } catch (Exception e) { + StringBuilder documentLogBuilder = new StringBuilder(); + for (int i = 0; i < indexRequests.size(); ++i) { + IndexRequest indexRequest = indexRequests.get(i); + documentLogBuilder.append(indexRequest.index()); + documentLogBuilder.append("/"); + documentLogBuilder.append(indexRequest.id()); + if (i < indexRequests.size() - 1) { + documentLogBuilder.append(", "); + } + onFailure.accept(slots.get(i), e); } + logger.debug( + () -> new ParameterizedMessage( + "failed to execute pipeline [{}] for documents [{}]", + pipelineId, + documentLogBuilder.toString() + ), + e + ); + completeExecution(counter, onCompletion, originalThread, indexRequests.size()); } + } private void executePipelines( final int slot, - final Iterator it, - final boolean hasFinalPipeline, + final Iterator pipelineInfoIterator, final IndexRequest indexRequest, + final DocWriteRequest actionRequest, final IntConsumer onDropped, final BiConsumer onFailure, final AtomicInteger counter, final BiConsumer onCompletion, final Thread originalThread ) { - while (it.hasNext()) { - final String pipelineId = it.next(); - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } - Pipeline pipeline = holder.pipeline; - String originalIndex = indexRequest.indices()[0]; - innerExecute(slot, indexRequest, pipeline, onDropped, e -> { - if (e != null) { - logger.debug( - () -> new ParameterizedMessage( - "failed to execute pipeline [{}] for document [{}/{}]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - e - ); - onFailure.accept(slot, e); - } + if (pipelineInfoIterator.hasNext() == false) { + throw new RuntimeException("Should not executePipelines when the pipeline iterator does not have next pipeline."); + } - Iterator newIt = it; - boolean newHasFinalPipeline = hasFinalPipeline; - String newIndex = indexRequest.indices()[0]; + final IngestPipelineInfo pipelineInfo = pipelineInfoIterator.next(); + final String pipelineId = pipelineInfo.getPipelineId(); + try { + final IngestPipelineType pipelineType = pipelineInfo.getType(); + final Pipeline pipeline = getPipeline(pipelineId, pipelineType, actionRequest, indexRequest); - if (Objects.equals(originalIndex, newIndex) == false) { - if (hasFinalPipeline && it.hasNext() == false) { - totalMetrics.failed(); - onFailure.accept( - slot, - new IllegalStateException("final pipeline [" + pipelineId + "] can't change the target index") - ); - } else { + if (pipeline == null) { + // in a valid null case if this is the last pipeline we should complete the execution + if (pipelineInfoIterator.hasNext() == false) { + completeExecution(counter, onCompletion, originalThread, 1); + } + // do not execute the pipeline since it is a valid null + return; + } - // Drain old it so it's not looped over - it.forEachRemaining($ -> {}); - indexRequest.isPipelineResolved(false); - resolvePipelines(null, indexRequest, state.metadata()); - if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false) { - newIt = Collections.singleton(indexRequest.getFinalPipeline()).iterator(); - newHasFinalPipeline = true; - } else { - newIt = Collections.emptyIterator(); - } - } - } + String originalIndex = indexRequest.indices()[0]; + innerExecute(slot, indexRequest, pipeline, onDropped, e -> { + if (e != null) { + logger.debug( + () -> new ParameterizedMessage( + "failed to execute pipeline [{}] for document [{}/{}]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + e + ); + onFailure.accept(slot, e); + } - if (newIt.hasNext()) { - executePipelines( + String newIndex = indexRequest.indices()[0]; + + if (Objects.equals(originalIndex, newIndex) == false) { + if (IngestPipelineType.FINAL.equals(pipelineType) || IngestPipelineType.SYSTEM_FINAL.equals(pipelineType)) { + totalMetrics.failed(); + onFailure.accept( slot, - newIt, - newHasFinalPipeline, - indexRequest, - onDropped, - onFailure, - counter, - onCompletion, - originalThread + new IllegalStateException(pipelineType + " pipeline [" + pipelineId + "] can't change the target index") ); } else { - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); - } - assert counter.get() >= 0; + resetPipelineAfterIndexChange(indexRequest); } - }); - } catch (Exception e) { - logger.debug( - () -> new ParameterizedMessage( - "failed to execute pipeline [{}] for document [{}/{}]", - pipelineId, - indexRequest.index(), - indexRequest.id() - ), - e - ); - onFailure.accept(slot, e); - if (counter.decrementAndGet() == 0) { - onCompletion.accept(originalThread, null); + + // Drain old it so it's not looped over. We will re-process the pipelines of the request with + // the new target index in TransportBulkAction. + pipelineInfoIterator.forEachRemaining($ -> {}); } - assert counter.get() >= 0; - break; + if (pipelineInfoIterator.hasNext()) { + executePipelines( + slot, + pipelineInfoIterator, + indexRequest, + actionRequest, + onDropped, + onFailure, + counter, + onCompletion, + originalThread + ); + } else { + completeExecution(counter, onCompletion, originalThread, 1); + } + }); + } catch (Exception e) { + logger.debug( + () -> new ParameterizedMessage( + "failed to execute pipeline [{}] for document [{}/{}]", + pipelineId, + indexRequest.index(), + indexRequest.id() + ), + e + ); + onFailure.accept(slot, e); + completeExecution(counter, onCompletion, originalThread, 1); + } + + } + + /** + * In this function we will reset the pipeline for the index request to make it like the pipeline not resolved. + * This function should be invoked when the target index is changed by the default pipeline. In this way when we + * switch back to the write thread the transport bulk action can re-process it with the new target index. + * + * We will not execute the default pipeline of the new target index so we don't reset it as null and keep it as + * _none. So in re-process it will be treated as the no-op. We are not sure if this is the ideal behavior, but it is + * the existing behavior and can avoid infinite index re-route so we decide to keep this behavior. + * @param indexRequest An index request. + */ + private void resetPipelineAfterIndexChange(IndexRequest indexRequest) { + indexRequest.isPipelineResolved(false); + indexRequest.setFinalPipeline(null); + indexRequest.setSystemIngestPipeline(null); + } + + private Pipeline getPipeline( + String pipelineId, + IngestPipelineType pipelineType, + DocWriteRequest actionRequest, + IndexRequest indexRequest + ) { + final PipelineHolder holder = pipelines.get(pipelineId); + if (IngestPipelineType.SYSTEM_FINAL.equals(pipelineType)) { + Pipeline indexPipeline = systemIngestPipelineCache.getSystemIngestPipeline(pipelineId); + // In very edge case it is possible the cache is invalidated after we resolve the + // pipeline. So try to resolve the system ingest pipeline again here. + if (indexPipeline == null) { + final String newPipelineId = resolveSystemIngestPipeline(actionRequest, indexRequest, state.metadata()); + // set it as NOOP to avoid duplicated execution after we switch back to the write thread + indexRequest.setSystemIngestPipeline(NOOP_PIPELINE_NAME); + indexPipeline = systemIngestPipelineCache.getSystemIngestPipeline(newPipelineId); } + + return indexPipeline; + } else { + return getPipelineFromHolder(holder, pipelineId); + } + } + + private void completeExecution( + @NonNull final AtomicInteger counter, + @NonNull final BiConsumer onCompletion, + @NonNull final Thread originalThread, + final int completedRequestSize + ) { + if (counter.addAndGet(-completedRequestSize) == 0) { + onCompletion.accept(originalThread, null); + } + assert counter.get() >= 0; + } + + private Pipeline getPipelineFromHolder(final PipelineHolder holder, @NonNull final String pipelineId) { + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } + return holder.pipeline; } public IngestStats stats() { @@ -1105,6 +1380,7 @@ private void innerBatchExecute( @Override public void applyClusterState(final ClusterChangedEvent event) { + invalidateSystemIngestPipeline(event); state = event.state(); if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { return; @@ -1128,6 +1404,23 @@ public void applyClusterState(final ClusterChangedEvent event) { } } + private void invalidateSystemIngestPipeline(@NonNull final ClusterChangedEvent event) { + final Map currentIndices = event.state().metadata().indices(); + final Map previousIndices = event.previousState().metadata().indices(); + for (Map.Entry entry : previousIndices.entrySet()) { + final String indexName = entry.getKey(); + final IndexMetadata previousIndexMetadata = entry.getValue(); + assert previousIndexMetadata != null : "IndexMetadata in the previous state metadata indices should not be null."; + final IndexMetadata currentIndexMetadata = currentIndices.get(indexName); + if (currentIndexMetadata == null || ClusterChangedEvent.indexMetadataChanged(previousIndexMetadata, currentIndexMetadata)) { + systemIngestPipelineCache.invalidateCacheForIndex(previousIndexMetadata.getIndex().toString()); + systemIngestPipelineCache.invalidateCacheForIndex( + createIndexIdWithTemplateSuffix(previousIndexMetadata.getIndex().getName()) + ); + } + } + } + void innerUpdatePipelines(IngestMetadata newIngestMetadata) { Map existingPipelines = this.pipelines; diff --git a/server/src/main/java/org/opensearch/ingest/Pipeline.java b/server/src/main/java/org/opensearch/ingest/Pipeline.java index 708416cfca3b7..87523397eb049 100644 --- a/server/src/main/java/org/opensearch/ingest/Pipeline.java +++ b/server/src/main/java/org/opensearch/ingest/Pipeline.java @@ -37,6 +37,7 @@ import org.opensearch.common.metrics.OperationMetrics; import org.opensearch.script.ScriptService; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -46,6 +47,8 @@ import java.util.function.Consumer; import java.util.function.LongSupplier; +import reactor.util.annotation.NonNull; + /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. * @@ -122,6 +125,54 @@ public static Pipeline create( return new Pipeline(id, description, version, compoundProcessor); } + public static Pipeline createSystemIngestPipeline( + @NonNull final String index, + @NonNull final Map systemIngestProcessorFactories, + @NonNull final Map config + ) { + final String id = index + "_system_generated_ingest_pipeline"; + final String description = "This is an in-memory systematically generated ingest pipeline."; + final List processors = new ArrayList<>(); + // This config is based on the index and will be used by all the factories. Each factory should not modify it so + // make it an unmodifiable map. + final Map processorConfig = Collections.unmodifiableMap(config); + // if no config we create an empty pipeline with no processor. + if (config.isEmpty()) { + return new Pipeline(id, description, null, new CompoundProcessor()); + } + + systemIngestProcessorFactories.values().forEach((factory) -> { + try { + final Processor processor = factory.create(systemIngestProcessorFactories, null, null, processorConfig); + if (processor != null) { + if (processor.isSystemGenerated() == false) { + throw new RuntimeException( + "Cannot create the system generated ingest pipeline " + + "for the index [" + + index + + "] because the processor [" + + processor.getClass().getName() + + "] is not a system generated ingest processor." + ); + } + processors.add(processor); + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to systematically create the system ingest processor from the factory " + + factory.getClass().getName() + + " for the index " + + index + + ". " + + e.getMessage(), + e + ); + } + }); + + return new Pipeline(id, description, null, new CompoundProcessor(false, processors, Collections.emptyList())); + } + /** * Modifies the data of a document to be indexed based on the processor this pipeline holds *

diff --git a/server/src/main/java/org/opensearch/ingest/Processor.java b/server/src/main/java/org/opensearch/ingest/Processor.java index f873128ea28f0..ce86b1d9bc2f7 100644 --- a/server/src/main/java/org/opensearch/ingest/Processor.java +++ b/server/src/main/java/org/opensearch/ingest/Processor.java @@ -136,6 +136,13 @@ private void innerExecute( */ String getDescription(); + /** + * @return if the processor is systematically generated + */ + default boolean isSystemGenerated() { + return false; + } + /** * A factory that knows how to construct a processor based on a map of maps. */ @@ -152,6 +159,13 @@ interface Factory { */ Processor create(Map processorFactories, String tag, String description, Map config) throws Exception; + + /** + * @return if the factory is for system generated processor + */ + default boolean isSystemGenerated() { + return false; + } } /** diff --git a/server/src/main/java/org/opensearch/ingest/SystemIngestPipelineCache.java b/server/src/main/java/org/opensearch/ingest/SystemIngestPipelineCache.java new file mode 100644 index 0000000000000..81db64b8898ce --- /dev/null +++ b/server/src/main/java/org/opensearch/ingest/SystemIngestPipelineCache.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import reactor.util.annotation.NonNull; + +/** + * Cache for system ingest pipeline + */ +public class SystemIngestPipelineCache { + private final Map cache; + + private final int MAX_ENTRIES = 100; + private final long EXPIRES_IN_MINUTES = 60; + + public SystemIngestPipelineCache() { + this.cache = new ConcurrentHashMap<>(); + } + + /** + * Cache an system ingest pipeline for an index. + * + * @param index [index_name/index_uuid] + * @param systemIngestPipeline A pipeline created based on index configuration. + * @param maxIngestProcessorCount + */ + public void cachePipeline( + @NonNull final String index, + @NonNull final Pipeline systemIngestPipeline, + final int maxIngestProcessorCount + ) { + if (systemIngestPipeline.getProcessors().size() > maxIngestProcessorCount) { + throw new IllegalArgumentException("Too many system ingest processors for index: " + index); + } + if (cache.size() >= MAX_ENTRIES) { + evictOldestAndExpiredCacheEntry(); + } + cache.put(index, new CacheEntry(systemIngestPipeline)); + } + + // Evict the oldest and expired cache entry based on time + private void evictOldestAndExpiredCacheEntry() { + String oldestIndex = null; + long oldestTimestamp = Long.MAX_VALUE; + final List expiredIndices = new ArrayList<>(); + + for (Map.Entry entry : cache.entrySet()) { + final CacheEntry cacheEntry = entry.getValue(); + if (cacheEntry.getLastAccessTimestamp() < oldestTimestamp) { + oldestTimestamp = cacheEntry.getLastAccessTimestamp(); + oldestIndex = entry.getKey(); + } + if (cacheEntry.isExpired()) { + expiredIndices.add(entry.getKey()); + } + } + + if (oldestIndex != null) { + // Remove the oldest entry from both the cache and access order + cache.remove(oldestIndex); + } + + for (final String expiredIndex : expiredIndices) { + cache.remove(expiredIndex); + } + } + + /** + * Get the cached system ingest pipeline for an index. + * @param index [index_name/index_uuid] + * @return cached system ingest pipeline + */ + public Pipeline getSystemIngestPipeline(@NonNull final String index) { + // Check if the cache contains a valid entry for the index + final CacheEntry entry = cache.get(index); + if (entry != null) { + if (entry.isExpired()) { + cache.remove(index); + return null; + } else { + entry.setLastAccessTimestamp(System.currentTimeMillis()); + return entry.getSystemIngestPipeline(); + } + } + return null; + } + + public int size() { + return cache.size(); + } + + /** + * Invalidate the cache for an index. + * @param index [index_name/index_uuid] + */ + public void invalidateCacheForIndex(@NonNull final String index) { + cache.remove(index); + } + + private class CacheEntry { + private final Pipeline systemIngestPipeline; + private final long createTimestamp; + private long lastAccessTimestamp; + + public CacheEntry(Pipeline systemIngestPipeline) { + this.systemIngestPipeline = systemIngestPipeline; + this.createTimestamp = System.currentTimeMillis(); + this.lastAccessTimestamp = createTimestamp; + } + + public Pipeline getSystemIngestPipeline() { + return systemIngestPipeline; + } + + public boolean isExpired() { + return System.currentTimeMillis() - createTimestamp > TimeUnit.MINUTES.toMillis(EXPIRES_IN_MINUTES); + } + + public void setLastAccessTimestamp(final long lastAccessTimestamp) { + this.lastAccessTimestamp = lastAccessTimestamp; + } + + public long getLastAccessTimestamp() { + return this.lastAccessTimestamp; + } + } +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 8e46c83065d4e..25a43aa635127 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -185,6 +185,7 @@ import org.opensearch.indices.replication.SegmentReplicator; import org.opensearch.indices.store.IndicesStore; import org.opensearch.ingest.IngestService; +import org.opensearch.ingest.SystemIngestPipelineCache; import org.opensearch.monitor.MonitorService; import org.opensearch.monitor.fs.FsHealthService; import org.opensearch.monitor.fs.FsProbe; @@ -991,7 +992,9 @@ protected Node(final Environment initialEnvironment, Collection clas analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class), client, - indicesService + indicesService, + xContentRegistry, + new SystemIngestPipelineCache() ); final AliasValidator aliasValidator = new AliasValidator(); diff --git a/server/src/main/java/org/opensearch/plugins/IngestPlugin.java b/server/src/main/java/org/opensearch/plugins/IngestPlugin.java index dc4f22de71344..b169d354ff977 100644 --- a/server/src/main/java/org/opensearch/plugins/IngestPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/IngestPlugin.java @@ -54,4 +54,60 @@ public interface IngestPlugin { default Map getProcessors(Processor.Parameters parameters) { return Collections.emptyMap(); } + + /** + * Returns additional system ingest processor types added by this plugin. + *

+ * The key of the returned {@link Map} is the unique name for the processor, and the value is a {@link org.opensearch.ingest.Processor.Factory} + * to create the processor systematically. + * + */ + default Map getSystemIngestProcessors(Processor.Parameters parameters) { + return Collections.emptyMap(); + } + + /** + * Define the keys we can use in the config for the system ingest pipeline. + * Currently we will only systematically generate the ingest pipeline based on the index mapping. + */ + class SystemIngestPipelineConfigKeys { + /** + * Use this key to access the mappings of the index from the config + * example: + * { + * "_doc":{ + * "properties":{ + * "fieldName":{ + * "type": "text" + * } + * } + * } + * } + */ + public static final String INDEX_MAPPINGS = "index_mappings"; + + /** + * Use this key to access the mappings of the matched templates of the index from the config. This will be used + * for the case when we try to index a doc while the index has not been created. And the index name matches some + * templates. In that case we should create the system ingest pipeline based on the matched templates. + * + * If there are multiple matched templates the later one can override the setting of the previous one if merge + * rules are allowed. It works like you define the index first and then update it. So it will not be able to + * override the field which is not updatable. + * example mappings from templates: + * [ + * { + * "_doc":{ + * "properties":{ + * "fieldName":{ + * "type": "text" + * } + * } + * } + * }, + * {...} + * ] + */ + public static final String INDEX_TEMPLATE_MAPPINGS = "index_template_mappings"; + } } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index ff9e41ee7c784..d2ee946ee9e57 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -62,6 +62,7 @@ import org.opensearch.transport.TransportService; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -71,6 +72,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static java.util.Collections.singleton; +import static org.opensearch.ingest.IngestServiceTests.createIngestServiceWithProcessors; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -146,7 +148,7 @@ private void indicesThatCannotBeCreatedTestCase( threadPool, mock(TransportService.class), clusterService, - null, + createIngestServiceWithProcessors(Collections.emptyMap()), null, null, mock(ActionFilters.class), diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 847140e81e201..a1b41117ee13b 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -43,6 +43,7 @@ import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.update.UpdateRequest; import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.AliasMetadata; @@ -51,12 +52,14 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.metadata.Template; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.collect.MapBuilder; +import org.opensearch.common.compress.CompressedXContent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -72,7 +75,7 @@ import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.ClusterServiceUtils; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.test.VersionUtils; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; @@ -92,6 +95,7 @@ import org.mockito.ArgumentCaptor; import static java.util.Collections.emptyMap; +import static org.opensearch.ingest.IngestServiceTests.createIngestServiceWithProcessors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Answers.RETURNS_MOCKS; @@ -99,15 +103,19 @@ import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class TransportBulkActionIngestTests extends OpenSearchTestCase { +public class TransportBulkActionIngestTests extends OpenSearchSingleNodeTestCase { /** * Index for which mock settings contain a default pipeline. @@ -287,7 +295,8 @@ public void setupAction() { return null; }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); // setup the mocked ingest service for capturing calls - ingestService = mock(IngestService.class); + ingestService = spy(createIngestServiceWithProcessors(Collections.emptyMap())); + doNothing().when(ingestService).executeBulkRequest(anyInt(), any(), any(), any(), any(), anyString()); action = new TestTransportBulkAction(); singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action); reset(transportService); // call on construction of action @@ -300,7 +309,8 @@ public void testIngestSkipped() throws Exception { bulkRequest.add(indexRequest); action.execute(null, bulkRequest, ActionListener.wrap(response -> {}, exception -> { throw new AssertionError(exception); })); assertTrue(action.isExecuted); - verifyNoInteractions(ingestService); + verify(ingestService, times(1)).resolvePipelines(any(), any(), any()); + verifyNoMoreInteractions(ingestService); } public void testSingleItemBulkActionIngestSkipped() throws Exception { @@ -310,7 +320,8 @@ public void testSingleItemBulkActionIngestSkipped() throws Exception { throw new AssertionError(exception); })); assertTrue(action.isExecuted); - verifyNoInteractions(ingestService); + verify(ingestService, times(1)).resolvePipelines(any(), any(), any()); + verifyNoMoreInteractions(ingestService); } public void testIngestLocal() throws Exception { @@ -655,8 +666,8 @@ public void testNotFindDefaultPipelineFromTemplateMatches() { failureCalled.set(true); })); assertEquals(IngestService.NOOP_PIPELINE_NAME, indexRequest.getPipeline()); - verifyNoInteractions(ingestService); - + verify(ingestService, times(1)).resolvePipelines(any(), any(), any()); + verifyNoMoreInteractions(ingestService); } public void testFindDefaultPipelineFromTemplateMatch() { @@ -720,12 +731,16 @@ public void testFindDefaultPipelineFromTemplateMatch() { ); } - public void testFindDefaultPipelineFromV2TemplateMatch() { + public void testFindDefaultPipelineFromV2TemplateMatch() throws Exception { Exception exception = new Exception("fake exception"); ComposableIndexTemplate t1 = new ComposableIndexTemplate( Collections.singletonList("missing_*"), - new Template(Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build(), null, null), + new Template( + Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "pipeline2").build(), + new CompressedXContent("{}"), + null + ), null, null, null, @@ -734,9 +749,16 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { ); ClusterState state = clusterService.state(); + final MetadataIndexTemplateService metadataIndexTemplateService = getInstanceFromNode(MetadataIndexTemplateService.class); + metadataIndexTemplateService.addIndexTemplateV2(state, false, "my-template", t1); Metadata metadata = Metadata.builder().put("my-template", t1).build(); when(state.metadata()).thenReturn(metadata); when(state.getMetadata()).thenReturn(metadata); + // mock the cluster state for the ingest service + ClusterState ingestServiceClusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + ingestService.applyClusterState( + new ClusterChangedEvent("testFindDefaultPipelineFromV2TemplateMatch", ingestServiceClusterState, ingestServiceClusterState) + ); IndexRequest indexRequest = new IndexRequest("missing_index").id("id"); indexRequest.source(emptyMap()); @@ -757,6 +779,8 @@ public void testFindDefaultPipelineFromV2TemplateMatch() { any(), eq(Names.WRITE) ); + + state.metadata().templatesV2().remove("my-template"); } private void validateDefaultPipeline(IndexRequest indexRequest) { diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index 6bbd740df7f9c..3fa1dc7ffab8e 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -86,6 +86,7 @@ import static java.util.Collections.singletonMap; import static org.opensearch.action.bulk.TransportBulkAction.prohibitCustomRoutingOnDataStream; import static org.opensearch.cluster.metadata.MetadataCreateDataStreamServiceTests.createDataStream; +import static org.opensearch.ingest.IngestServiceTests.createIngestServiceWithProcessors; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -110,7 +111,7 @@ class TestTransportBulkAction extends TransportBulkAction { TransportBulkActionTests.this.threadPool, transportService, clusterService, - null, + createIngestServiceWithProcessors(Collections.emptyMap()), null, null, new ActionFilters(Collections.emptySet()), diff --git a/server/src/test/java/org/opensearch/ingest/AbstractBatchingSystemProcessorTests.java b/server/src/test/java/org/opensearch/ingest/AbstractBatchingSystemProcessorTests.java new file mode 100644 index 0000000000000..e4081a9269102 --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/AbstractBatchingSystemProcessorTests.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoInteractions; + +public class AbstractBatchingSystemProcessorTests extends OpenSearchTestCase { + public void testSystemFactory_shouldNotModifyConfig() throws Exception { + Map config = mock(Map.class); + + AbstractBatchingSystemProcessorTests.DummyProcessor.DummySystemProcessorFactory factory = + new AbstractBatchingSystemProcessorTests.DummyProcessor.DummySystemProcessorFactory("DummyProcessor"); + factory.create(config); + + assertTrue(factory.isSystemGenerated()); + verifyNoInteractions(config); + } + + static class DummyProcessor extends AbstractBatchingSystemProcessor { + private List> subBatches = new ArrayList<>(); + + public List> getSubBatches() { + return subBatches; + } + + protected DummyProcessor(int batchSize) { + super("tag", "description", batchSize); + } + + @Override + public void subBatchExecute(List ingestDocumentWrappers, Consumer> handler) { + subBatches.add(ingestDocumentWrappers); + handler.accept(ingestDocumentWrappers); + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + return ingestDocument; + } + + @Override + public String getType() { + return null; + } + + public static class DummySystemProcessorFactory extends AbstractBatchingSystemProcessor.Factory { + protected DummySystemProcessorFactory(String processorType) { + super(processorType); + } + + @Override + protected AbstractBatchingSystemProcessor newProcessor(String tag, String description, Map config) { + return new AbstractBatchingSystemProcessorTests.DummyProcessor(1); + } + + public AbstractBatchingSystemProcessor create(Map config) throws Exception { + return super.create(Collections.emptyMap(), "tag", "description", config); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java index ed2a92ec48573..24f8562c4700f 100644 --- a/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/opensearch/ingest/IngestServiceTests.java @@ -49,9 +49,11 @@ import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.AliasMetadata; +import org.opensearch.cluster.metadata.ComposableIndexTemplate; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexTemplateMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SetOnce; @@ -62,7 +64,9 @@ import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.xcontent.cbor.CborXContent; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.MediaTypeRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.index.IndexSettings; import org.opensearch.index.VersionType; @@ -74,7 +78,7 @@ import org.opensearch.script.ScriptService; import org.opensearch.script.ScriptType; import org.opensearch.test.MockLogAppender; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.ThreadPool; import org.opensearch.threadpool.ThreadPool.Names; import org.opensearch.transport.client.Client; @@ -103,10 +107,14 @@ import java.util.stream.Collectors; import org.mockito.ArgumentMatcher; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; +import reactor.util.annotation.NonNull; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.opensearch.ingest.IngestService.NOOP_PIPELINE_NAME; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -124,29 +132,41 @@ import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -public class IngestServiceTests extends OpenSearchTestCase { +public class IngestServiceTests extends OpenSearchSingleNodeTestCase { + + @Mock + private static Processor.Factory mockSystemProcessorFactory; + @Mock + private Processor mockSystemProcessor; private static final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() { @Override public Map getProcessors(Processor.Parameters parameters) { return Collections.singletonMap("foo", (factories, tag, description, config) -> null); } + + @Override + public Map getSystemIngestProcessors(Processor.Parameters parameters) { + return Map.of("foo", mockSystemProcessorFactory); + } }; private ThreadPool threadPool; - private BulkRequest mockBulkRequest; @Before - public void setup() { + public void setup() throws Exception { + MockitoAnnotations.openMocks(this); threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); when(threadPool.generic()).thenReturn(executorService); when(threadPool.executor(anyString())).thenReturn(executorService); - mockBulkRequest = mock(BulkRequest.class); + when(mockSystemProcessorFactory.isSystemGenerated()).thenReturn(true); } public void testIngestPlugin() { @@ -163,11 +183,17 @@ public void testIngestPlugin() { null, Collections.singletonList(DUMMY_PLUGIN), client, - mock(IndicesService.class) + mock(IndicesService.class), + mock(NamedXContentRegistry.class), + mock(SystemIngestPipelineCache.class) ); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); + + Map systemFactories = ingestService.getSystemProcessorFactories(); + assertTrue(systemFactories.containsKey("foo")); + assertEquals(1, systemFactories.size()); } public void testIngestPluginDuplicate() { @@ -182,13 +208,15 @@ public void testIngestPluginDuplicate() { null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN), client, - mock(IndicesService.class) + mock(IndicesService.class), + mock(NamedXContentRegistry.class), + mock(SystemIngestPipelineCache.class) ) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } - public void testExecuteIndexPipelineDoesNotExist() { + public void testExecuteSystemPipelineDoesNotExist() { Client client = mock(Client.class); ClusterService clusterService = mock(ClusterService.class); when(clusterService.getClusterSettings()).thenReturn( @@ -202,7 +230,9 @@ public void testExecuteIndexPipelineDoesNotExist() { null, Collections.singletonList(DUMMY_PLUGIN), client, - mock(IndicesService.class) + mock(IndicesService.class), + mock(NamedXContentRegistry.class), + mock(SystemIngestPipelineCache.class) ); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(emptyMap()) @@ -234,7 +264,7 @@ public void testExecuteIndexPipelineDoesNotExist() { } public void testUpdatePipelines() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); ClusterState previousClusterState = clusterState; ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @@ -258,7 +288,7 @@ public void testUpdatePipelines() { } public void testInnerUpdatePipelines() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); assertThat(ingestService.pipelines().size(), is(0)); PipelineConfiguration pipeline1 = new PipelineConfiguration("_id1", new BytesArray("{\"processors\": []}"), MediaTypeRegistry.JSON); @@ -341,7 +371,7 @@ private static Map mapOf(K key1, V value1, K key2, V value2, K key3 } public void testDelete() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); PipelineConfiguration config = new PipelineConfiguration( "_id", new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), @@ -373,7 +403,7 @@ public void testDelete() { } public void testValidateNoIngestInfo() throws Exception { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), @@ -394,7 +424,7 @@ public void testValidateNoIngestInfo() throws Exception { } public void testValidatePipelineId_WithNotValidLength_ShouldThrowException() throws Exception { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); String longId = "a".repeat(512) + "a"; PutPipelineRequest putRequest = new PutPipelineRequest( @@ -427,7 +457,7 @@ public void testValidatePipelineId_WithNotValidLength_ShouldThrowException() thr } public void testGetProcessorsInPipeline() throws Exception { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); assertThat(pipeline, nullValue()); @@ -496,7 +526,7 @@ public void testGetProcessorsInPipelineComplexConditional() throws Exception { ); }); - IngestService ingestService = createWithProcessors(processors); + IngestService ingestService = createIngestServiceWithProcessors(processors); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); assertThat(pipeline, nullValue()); @@ -522,7 +552,7 @@ public void testGetProcessorsInPipelineComplexConditional() throws Exception { } public void testCrud() throws Exception { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); assertThat(pipeline, nullValue()); @@ -552,7 +582,7 @@ public void testCrud() throws Exception { } public void testPut() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); assertThat(pipeline, nullValue()); @@ -586,7 +616,7 @@ public void testPut() { } public void testPutWithErrorResponse() throws IllegalAccessException { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); String id = "_id"; Pipeline pipeline = ingestService.getPipeline(id); assertThat(pipeline, nullValue()); @@ -624,7 +654,7 @@ public void testPutWithErrorResponse() throws IllegalAccessException { } public void testDeleteUsingWildcard() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); HashMap pipelines = new HashMap<>(); BytesArray definition = new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"); pipelines.put("p1", new PipelineConfiguration("p1", definition, MediaTypeRegistry.JSON)); @@ -672,7 +702,7 @@ public void testDeleteUsingWildcard() { } public void testDeleteWithExistingUnmatchedPipelines() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); HashMap pipelines = new HashMap<>(); BytesArray definition = new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"); pipelines.put("p1", new PipelineConfiguration("p1", definition, MediaTypeRegistry.JSON)); @@ -732,7 +762,7 @@ public void testGetPipelines() { } public void testValidate() throws Exception { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray( @@ -761,7 +791,7 @@ public void testValidate() throws Exception { } public void testValidateProcessorCountForIngestPipelineThrowsException() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray( @@ -784,7 +814,7 @@ public void testValidateProcessorCountForIngestPipelineThrowsException() { } public void testValidateProcessorCountForWithNestedOnFailureProcessorThrowsException() { - IngestService ingestService = createWithProcessors(); + IngestService ingestService = createIngestServiceWithProcessors(); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray( @@ -878,8 +908,8 @@ public void testValidateProcessorCountForWithNestedOnFailureProcessorThrowsExcep expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); } - public void testExecuteIndexPipelineExistsButFailedParsing() { - IngestService ingestService = createWithProcessors( + public void testExecuteSystemPipelineExistsButFailedParsing() { + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> new AbstractProcessor("mock", "description") { @Override public IngestDocument execute(IngestDocument ingestDocument) { @@ -907,9 +937,14 @@ public String getType() { final IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") .source(emptyMap()) .setPipeline("_none") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline(id).setFinalPipeline("_none"); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline(id) + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest2); final BiConsumer failureHandler = (slot, e) -> { @@ -936,7 +971,7 @@ public String getType() { } public void testExecuteBulkPipelineDoesNotExist() { - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor()) ); @@ -984,7 +1019,7 @@ public void testExecuteBulkPipelineDoesNotExist() { } public void testExecuteSuccess() { - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor()) ); PutPipelineRequest putRequest = new PutPipelineRequest( @@ -999,7 +1034,8 @@ public void testExecuteSuccess() { final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(emptyMap()) .setPipeline("_id") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -1017,7 +1053,7 @@ public void testExecuteSuccess() { } public void testExecuteEmptyPipeline() throws Exception { - IngestService ingestService = createWithProcessors(emptyMap()); + IngestService ingestService = createIngestServiceWithProcessors(emptyMap()); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), @@ -1030,7 +1066,8 @@ public void testExecuteEmptyPipeline() throws Exception { final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(emptyMap()) .setPipeline("_id") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -1049,7 +1086,7 @@ public void testExecuteEmptyPipeline() throws Exception { public void testExecutePropagateAllMetadataUpdates() throws Exception { final CompoundProcessor processor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> processor) ); PutPipelineRequest putRequest = new PutPipelineRequest( @@ -1116,7 +1153,7 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception { public void testExecuteFailure() throws Exception { final CompoundProcessor processor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> processor) ); PutPipelineRequest putRequest = new PutPipelineRequest( @@ -1176,7 +1213,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception { Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)) ); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> compoundProcessor) ); PutPipelineRequest putRequest = new PutPipelineRequest( @@ -1219,7 +1256,7 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors)) ); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> compoundProcessor) ); PutPipelineRequest putRequest = new PutPipelineRequest( @@ -1264,7 +1301,10 @@ public void testBulkRequestExecutionWithFailures() { int numIndexRequests = scaledRandomIntBetween(4, 32); for (int i = 0; i < numIndexRequests; i++) { - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); + IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .setPipeline(pipelineId) + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); bulkRequest.add(indexRequest); } @@ -1290,7 +1330,7 @@ public void testBulkRequestExecutionWithFailures() { handler.accept(ingestDocumentWrappers); return null; }).when(processor).batchExecute(any(), any()); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> processor) ); PutPipelineRequest putRequest = new PutPipelineRequest( @@ -1332,7 +1372,10 @@ public void testBulkRequestExecution() throws Exception { logger.info("Using [{}], not randomly determined default [{}]", xContentType, Requests.INDEX_CONTENT_TYPE); int numRequest = scaledRandomIntBetween(8, 64); for (int i = 0; i < numRequest; i++) { - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); + IndexRequest indexRequest = new IndexRequest("_index").id("_id") + .setPipeline(pipelineId) + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); indexRequest.source(xContentType, "field1", "value1"); bulkRequest.add(indexRequest); } @@ -1350,7 +1393,7 @@ public void testBulkRequestExecution() throws Exception { Map map = new HashMap<>(2); map.put("mock", (factories, tag, description, config) -> processor); - IngestService ingestService = createWithProcessors(map); + IngestService ingestService = createIngestServiceWithProcessors(map); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray("{\"processors\": [{\"mock\": {}}], \"description\": \"_description\"}"), @@ -1405,7 +1448,7 @@ public void testStats() throws Exception { Map map = new HashMap<>(2); map.put("mock", (factories, tag, description, config) -> processor); map.put("failure-mock", (factories, tag, description, config) -> processorFailure); - IngestService ingestService = createWithProcessors(map); + IngestService ingestService = createIngestServiceWithProcessors(map); final IngestStats initialStats = ingestService.stats(); assertThat(initialStats.getPipelineStats().size(), equalTo(0)); @@ -1587,7 +1630,7 @@ public String getDescription() { return null; } }); - IngestService ingestService = createWithProcessors(factories); + IngestService ingestService = createIngestServiceWithProcessors(factories); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray("{\"processors\": [{\"drop\" : {}}, {\"mock\" : {}}]}"), @@ -1602,13 +1645,15 @@ public String getDescription() { final IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") .source(Collections.emptyMap()) .setPipeline("_none") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") .source(Collections.emptyMap()) .setPipeline("_id") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") @@ -1661,7 +1706,9 @@ public Map getProcessors(Processor.Parameters paramet null, Arrays.asList(testPlugin), client, - mock(IndicesService.class) + mock(IndicesService.class), + mock(NamedXContentRegistry.class), + mock(SystemIngestPipelineCache.class) ); ingestService.addIngestClusterStateListener(ingestClusterStateListener); @@ -1683,7 +1730,7 @@ public Map getProcessors(Processor.Parameters paramet public void testCBORParsing() throws Exception { AtomicReference reference = new AtomicReference<>(); Consumer executor = doc -> reference.set(doc.getFieldValueAsBytes("data")); - final IngestService ingestService = createWithProcessors( + final IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("foo", (factories, tag, description, config) -> new FakeProcessor("foo", tag, description, executor)) ); @@ -1722,6 +1769,7 @@ public void testCBORParsing() throws Exception { } public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { + IngestService ingestService = createIngestServiceWithProcessors(); IndexMetadata.Builder builder = IndexMetadata.builder("idx") .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) .numberOfShards(1) @@ -1731,14 +1779,14 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { // index name matches with IDM: IndexRequest indexRequest = new IndexRequest("idx"); - boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + boolean result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); // alias name matches with IDM: indexRequest = new IndexRequest("alias"); - result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); @@ -1749,20 +1797,21 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")); metadata = Metadata.builder().put(templateBuilder).build(); indexRequest = new IndexRequest("idx"); - result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("default-pipeline")); // index name matches with ITMD for bulk upsert UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(emptyMap()).script(mockScript("1")); - result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); + result = ingestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); assertThat(result, is(true)); assertThat(updateRequest.upsertRequest().isPipelineResolved(), is(true)); assertThat(updateRequest.upsertRequest().getPipeline(), equalTo("default-pipeline")); } public void testResolveFinalPipeline() { + IngestService ingestService = createIngestServiceWithProcessors(); IndexMetadata.Builder builder = IndexMetadata.builder("idx") .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline")) .numberOfShards(1) @@ -1772,7 +1821,7 @@ public void testResolveFinalPipeline() { // index name matches with IDM: IndexRequest indexRequest = new IndexRequest("idx"); - boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + boolean result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("_none")); @@ -1780,7 +1829,7 @@ public void testResolveFinalPipeline() { // alias name matches with IDM: indexRequest = new IndexRequest("alias"); - result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("_none")); @@ -1792,7 +1841,7 @@ public void testResolveFinalPipeline() { .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline")); metadata = Metadata.builder().put(templateBuilder).build(); indexRequest = new IndexRequest("idx"); - result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("_none")); @@ -1800,28 +1849,29 @@ public void testResolveFinalPipeline() { // index name matches with ITMD for bulk upsert: UpdateRequest updateRequest = new UpdateRequest("idx", "id1").upsert(emptyMap()).script(mockScript("1")); - result = IngestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); + result = ingestService.resolvePipelines(updateRequest, TransportBulkAction.getIndexWriteRequest(updateRequest), metadata); assertThat(result, is(true)); assertThat(updateRequest.upsertRequest().isPipelineResolved(), is(true)); assertThat(updateRequest.upsertRequest().getFinalPipeline(), equalTo("final-pipeline")); } public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { + IngestService ingestService = createIngestServiceWithProcessors(); // no pipeline: { Metadata metadata = Metadata.builder().build(); IndexRequest indexRequest = new IndexRequest("idx"); - boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + boolean result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(false)); assertThat(indexRequest.isPipelineResolved(), is(true)); - assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); + assertThat(indexRequest.getPipeline(), equalTo(NOOP_PIPELINE_NAME)); } // request pipeline: { Metadata metadata = Metadata.builder().build(); IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); - boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + boolean result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); @@ -1835,7 +1885,7 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { .numberOfReplicas(0); Metadata metadata = Metadata.builder().put(builder).build(); IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); - boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + boolean result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); @@ -1849,7 +1899,7 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { .numberOfReplicas(0); Metadata metadata = Metadata.builder().put(builder).build(); IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); - boolean result = IngestService.resolvePipelines(indexRequest, indexRequest, metadata); + boolean result = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); @@ -1857,14 +1907,103 @@ public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { } } + public void testExecuteBulkRequestInBatch() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createIngestServiceWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + createPipeline("_id", ingestService); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3") + .source(emptyMap()) + .setPipeline("_none") + .setFinalPipeline("_id") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4") + .source(emptyMap()) + .setPipeline("_none") + .setFinalPipeline("_id") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest4); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(4, bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}, Names.WRITE); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, times(2)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + + public void testExecuteBulkRequestInBatchWithDefaultAndFinalPipeline() { + CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); + IngestService ingestService = createIngestServiceWithProcessors( + Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) + ); + ClusterState clusterState = createPipeline("_id", ingestService); + createPipeline("_final", ingestService, clusterState); + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_final") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_final") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_final") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest3); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_final") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest4); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final BiConsumer completionHandler = mock(BiConsumer.class); + ingestService.executeBulkRequest(4, bulkRequest.requests(), failureHandler, completionHandler, indexReq -> {}, Names.WRITE); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(Thread.currentThread(), null); + verify(mockCompoundProcessor, times(2)).batchExecute(any(), any()); + verify(mockCompoundProcessor, never()).execute(any(), any()); + } + public void testExecuteBulkRequestInBatchFallbackWithOneDocument() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) ); createPipeline("_id", ingestService); BulkRequest bulkRequest = new BulkRequest(); - IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @@ -1879,7 +2018,7 @@ public void testExecuteBulkRequestInBatchFallbackWithOneDocument() { public void testExecuteBulkRequestInBatchNoValidPipeline() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) ); createPipeline("_id", ingestService); @@ -1888,12 +2027,14 @@ public void testExecuteBulkRequestInBatchNoValidPipeline() { IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") .source(emptyMap()) .setPipeline("_none") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") .source(emptyMap()) .setPipeline("_none") - .setFinalPipeline("_none"); + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @@ -1908,7 +2049,7 @@ public void testExecuteBulkRequestInBatchNoValidPipeline() { public void testExecuteBulkRequestInBatchNoValidDocument() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) ); createPipeline("_id", ingestService); @@ -1929,7 +2070,7 @@ public void testExecuteBulkRequestInBatchNoValidDocument() { public void testExecuteBulkRequestInBatchWithException() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) ); doThrow(new RuntimeException()).when(mockCompoundProcessor).batchExecute(any(), any()); @@ -1953,17 +2094,29 @@ public void testExecuteBulkRequestInBatchWithException() { public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) ); createPipeline("_id", ingestService); BulkRequest bulkRequest = new BulkRequest(); // will not be handled as not valid document type - IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest2); - IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest3); List results = Arrays.asList( @@ -1998,18 +2151,34 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() { public void testExecuteBulkRequestInBatchWithDefaultBatchSize() { CompoundProcessor mockCompoundProcessor = mockCompoundProcessor(); - IngestService ingestService = createWithProcessors( + IngestService ingestService = createIngestServiceWithProcessors( Collections.singletonMap("mock", (factories, tag, description, config) -> mockCompoundProcessor) ); createPipeline("_id", ingestService); BulkRequest bulkRequest = new BulkRequest(); - IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest2); - IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_none").setFinalPipeline("_id"); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3") + .source(emptyMap()) + .setPipeline("_none") + .setFinalPipeline("_id") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest3); - IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest4); @SuppressWarnings("unchecked") final Map failureHandler = new HashMap<>(); @@ -2028,11 +2197,11 @@ public void testExecuteBulkRequestInBatchWithDefaultBatchSize() { assertEquals(1, completionHandler.size()); assertNull(completionHandler.get(Thread.currentThread())); verify(mockCompoundProcessor, times(1)).batchExecute(any(), any()); - verify(mockCompoundProcessor, never()).execute(any(), any()); + verify(mockCompoundProcessor, times(1)).execute(any(), any()); } public void testExecuteEmptyPipelineInBatch() throws Exception { - IngestService ingestService = createWithProcessors(emptyMap()); + IngestService ingestService = createIngestServiceWithProcessors(emptyMap()); PutPipelineRequest putRequest = new PutPipelineRequest( "_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), @@ -2043,13 +2212,29 @@ public void testExecuteEmptyPipelineInBatch() throws Exception { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); BulkRequest bulkRequest = new BulkRequest(); - IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest2); - IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest3); - IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); + IndexRequest indexRequest4 = new IndexRequest("_index").id("_id4") + .source(emptyMap()) + .setPipeline("_id") + .setFinalPipeline("_none") + .setSystemIngestPipeline("_none"); bulkRequest.add(indexRequest4); final Map failureHandler = new HashMap<>(); final Map completionHandler = new HashMap<>(); @@ -2066,14 +2251,23 @@ public void testExecuteEmptyPipelineInBatch() throws Exception { } public void testPrepareBatches_same_index_pipeline() { - IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); - IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); - IngestService.IndexRequestWrapper wrapper3 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); - IngestService.IndexRequestWrapper wrapper4 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); - List> batches = IngestService.prepareBatches( - 2, - Arrays.asList(wrapper1, wrapper2, wrapper3, wrapper4) + IndexRequestWrapper wrapper1 = createIndexRequestWrapper( + "index1", + Collections.singletonList(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT)) + ); + IndexRequestWrapper wrapper2 = createIndexRequestWrapper( + "index1", + Collections.singletonList(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT)) + ); + IndexRequestWrapper wrapper3 = createIndexRequestWrapper( + "index1", + Collections.singletonList(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT)) ); + IndexRequestWrapper wrapper4 = createIndexRequestWrapper( + "index1", + Collections.singletonList(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT)) + ); + List> batches = IngestService.prepareBatches(2, Arrays.asList(wrapper1, wrapper2, wrapper3, wrapper4)); assertEquals(2, batches.size()); for (int i = 0; i < 2; ++i) { assertEquals(2, batches.get(i).size()); @@ -2081,14 +2275,23 @@ public void testPrepareBatches_same_index_pipeline() { } public void testPrepareBatches_different_index_pipeline() { - IngestService.IndexRequestWrapper wrapper1 = createIndexRequestWrapper("index1", Collections.singletonList("p1")); - IngestService.IndexRequestWrapper wrapper2 = createIndexRequestWrapper("index2", Collections.singletonList("p1")); - IngestService.IndexRequestWrapper wrapper3 = createIndexRequestWrapper("index1", Arrays.asList("p1", "p2")); - IngestService.IndexRequestWrapper wrapper4 = createIndexRequestWrapper("index1", Collections.singletonList("p2")); - List> batches = IngestService.prepareBatches( - 2, - Arrays.asList(wrapper1, wrapper2, wrapper3, wrapper4) + IndexRequestWrapper wrapper1 = createIndexRequestWrapper( + "index1", + Collections.singletonList(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT)) + ); + IndexRequestWrapper wrapper2 = createIndexRequestWrapper( + "index2", + Collections.singletonList(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT)) + ); + IndexRequestWrapper wrapper3 = createIndexRequestWrapper( + "index1", + List.of(new IngestPipelineInfo("p1", IngestPipelineType.DEFAULT), new IngestPipelineInfo("p2", IngestPipelineType.DEFAULT)) ); + IndexRequestWrapper wrapper4 = createIndexRequestWrapper( + "index1", + Collections.singletonList(new IngestPipelineInfo("p2", IngestPipelineType.DEFAULT)) + ); + List> batches = IngestService.prepareBatches(2, Arrays.asList(wrapper1, wrapper2, wrapper3, wrapper4)); assertEquals(4, batches.size()); } @@ -2104,9 +2307,10 @@ public void testUpdateMaxIngestProcessorCountSetting() { assertEquals(3, clusterSettings.get(IngestService.MAX_NUMBER_OF_INGEST_PROCESSORS).intValue()); } - private IngestService.IndexRequestWrapper createIndexRequestWrapper(String index, List pipelines) { + private IndexRequestWrapper createIndexRequestWrapper(String index, List pipelineInfoList) { IndexRequest indexRequest = new IndexRequest(index); - return new IngestService.IndexRequestWrapper(0, indexRequest, pipelines, true); + DocWriteRequest actionRequest = new IndexRequest(index); + return new IndexRequestWrapper(0, indexRequest, actionRequest, pipelineInfoList); } private IngestDocument eqIndexTypeId(final Map source) { @@ -2117,7 +2321,7 @@ private IngestDocument eqIndexTypeId(final Long version, final VersionType versi return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); } - private static IngestService createWithProcessors() { + private static IngestService createIngestServiceWithProcessors() { Map processors = new HashMap<>(); processors.put("set", (factories, tag, description, config) -> { String field = (String) config.remove("field"); @@ -2129,11 +2333,21 @@ private static IngestService createWithProcessors() { return new WrappingProcessorImpl("remove", tag, description, (ingestDocument -> ingestDocument.removeField(field))) { }; }); - return createWithProcessors(processors); + + Map systemProcessors = new HashMap<>(); + systemProcessors.put("foo", mockSystemProcessorFactory); + + return createIngestServiceWithProcessors(processors, systemProcessors); } - private static IngestService createWithProcessors(Map processors) { + public static IngestService createIngestServiceWithProcessors(Map processors) { + return createIngestServiceWithProcessors(processors, Collections.emptyMap()); + } + public static IngestService createIngestServiceWithProcessors( + Map processors, + Map systemProcessors + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = OpenSearchExecutors.newDirectExecutorService(); @@ -2148,7 +2362,12 @@ private static IngestService createWithProcessors(Map public Map getProcessors(final Processor.Parameters parameters) { return processors; } - }), client, mock(IndicesService.class)); + + @Override + public Map getSystemIngestProcessors(Processor.Parameters parameters) { + return systemProcessors; + } + }), client, mock(IndicesService.class), mock(NamedXContentRegistry.class), spy(new SystemIngestPipelineCache())); } private CompoundProcessor mockCompoundProcessor() { @@ -2211,11 +2430,11 @@ private ClusterState createPipeline(String pipeline, IngestService ingestService } private ClusterState createPipeline(String pipeline, IngestService ingestService, ClusterState previousState) { - PutPipelineRequest putRequest = new PutPipelineRequest( - pipeline, - new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), - MediaTypeRegistry.JSON - ); + return createPipeline(pipeline, new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), ingestService, previousState); + } + + private ClusterState createPipeline(String pipeline, BytesArray config, IngestService ingestService, ClusterState previousState) { + PutPipelineRequest putRequest = new PutPipelineRequest(pipeline, config, MediaTypeRegistry.JSON); ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty if (previousState != null) { clusterState = previousState; @@ -2225,4 +2444,436 @@ private ClusterState createPipeline(String pipeline, IngestService ingestService ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); return clusterState; } + + public void testInvalidateCache() { + // initiate ingest service + final IngestService ingestService = createIngestServiceWithProcessors(); + final SystemIngestPipelineCache cache = ingestService.getSystemIngestPipelineCache(); + + // prepare test data and mock + final IndexMetadata indexMetadata1 = mock(IndexMetadata.class); + final Index index1 = new Index("index1", "uuid1"); + when(indexMetadata1.getIndex()).thenReturn(index1); + + final IndexMetadata indexMetadata2 = mock(IndexMetadata.class); + final Index index2 = new Index("index2", "uuid2"); + when(indexMetadata2.getIndex()).thenReturn(index2); + final IndexMetadata changedIndexMetadata2 = mock(IndexMetadata.class); + when(changedIndexMetadata2.getIndex()).thenReturn(index2); + + final IndexMetadata indexMetadata3 = mock(IndexMetadata.class); + final Index index3 = new Index("index3", "uuid3"); + when(indexMetadata3.getIndex()).thenReturn(index3); + + final Map previousIndices = Map.of( + "index1", + indexMetadata1, + "index2", + indexMetadata2, + "index3", + indexMetadata3 + ); + + final Pipeline dummyPipeline = new Pipeline("id", null, null, new CompoundProcessor()); + cache.cachePipeline(index1.toString(), dummyPipeline, 10); + cache.cachePipeline(index2.toString(), dummyPipeline, 10); + cache.cachePipeline(index3.toString(), dummyPipeline, 10); + cache.cachePipeline("[" + index3.getName() + "/template]", dummyPipeline, 10); + + final Map currentIndices = Map.of("index1", indexMetadata1, "index2", changedIndexMetadata2); + + final Metadata previousMetadata = mock(Metadata.class); + when(previousMetadata.indices()).thenReturn(previousIndices); + + final Metadata currentMetadata = mock(Metadata.class); + when(currentMetadata.indices()).thenReturn(currentIndices); + + final ClusterState previousClusterState = ClusterState.builder(new ClusterName("_name")).metadata(previousMetadata).build(); + final ClusterState currentClusterState = ClusterState.builder(new ClusterName("_name")).metadata(currentMetadata).build(); + + // process cluster state change event + ingestService.applyClusterState(new ClusterChangedEvent("", currentClusterState, previousClusterState)); + + // verify + assertNotNull(cache.getSystemIngestPipeline(index1.toString())); + assertNull(cache.getSystemIngestPipeline(index2.toString())); + assertNull(cache.getSystemIngestPipeline(index3.toString())); + assertNull(cache.getSystemIngestPipeline("[" + index3.getName() + "/template]")); + } + + public void testResolvePipelines_whenExistingIndex() throws Exception { + // mock + when(mockSystemProcessorFactory.create(any(), any(), any(), any())).thenReturn(mockSystemProcessor); + when(mockSystemProcessorFactory.isSystemGenerated()).thenReturn(true); + when(mockSystemProcessor.isSystemGenerated()).thenReturn(true); + + final IngestService ingestService = createIngestServiceWithProcessors(); + final SystemIngestPipelineCache cache = ingestService.getSystemIngestPipelineCache(); + final IndexMetadata indexMetadata = spy( + IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .putMapping("{}") + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetadata.builder("alias").writeIndex(true).build()) + .build() + ); + final Index index = new Index("idx", "uuid"); + when(indexMetadata.getIndex()).thenReturn(index); + Metadata metadata = Metadata.builder().indices(Map.of("idx", indexMetadata)).build(); + + // First time create the pipeline and cache it + IndexRequest indexRequest = new IndexRequest("idx"); + boolean hasPipeline = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); + // verify + verifySystemPipelineResolvedSuccessfully("[idx/uuid]", hasPipeline, indexRequest, cache); + + // Second time use the cache directly + IndexRequest indexRequest2 = new IndexRequest("idx"); + boolean hasPipeline2 = ingestService.resolvePipelines(indexRequest2, indexRequest2, metadata); + assertTrue(hasPipeline2); + assertTrue(indexRequest2.isPipelineResolved()); + assertEquals("[idx/uuid]", indexRequest2.getSystemIngestPipeline()); + verify(cache, times(2)).getSystemIngestPipeline(eq("[idx/uuid]")); + verifyNoMoreInteractions(cache); + } + + public void testResolvePipelines_whenExistingIndexAndSystemPipelineDisabled_thenNoSystemPipeline() throws Exception { + final IngestService ingestService = createIngestServiceWithProcessors(); + ingestService.getClusterService() + .getClusterSettings() + .applySettings(Settings.builder().put(IngestService.SYSTEM_INGEST_PIPELINE_ENABLED.getKey(), false).build()); + + final IndexMetadata indexMetadata = spy( + IndexMetadata.builder("idx") + .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")) + .putMapping("{}") + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetadata.builder("alias").writeIndex(true).build()) + .build() + ); + final Index index = new Index("idx", "uuid"); + when(indexMetadata.getIndex()).thenReturn(index); + Metadata metadata = Metadata.builder().indices(Map.of("idx", indexMetadata)).build(); + + IndexRequest indexRequest = new IndexRequest("idx"); + boolean hasPipeline = ingestService.resolvePipelines(indexRequest, indexRequest, metadata); + // verify + assertTrue(hasPipeline); + assertTrue(indexRequest.isPipelineResolved()); + assertEquals(NOOP_PIPELINE_NAME, indexRequest.getSystemIngestPipeline()); + } + + public void testResolvePipelines_whenUseTemplateV2() throws Exception { + // mock + when(mockSystemProcessorFactory.create(any(), any(), any(), any())).thenReturn(mockSystemProcessor); + when(mockSystemProcessorFactory.isSystemGenerated()).thenReturn(true); + when(mockSystemProcessor.isSystemGenerated()).thenReturn(true); + + ClusterState state = ClusterState.EMPTY_STATE; + final MetadataIndexTemplateService metadataIndexTemplateService = getInstanceFromNode(MetadataIndexTemplateService.class); + ComposableIndexTemplate v2Template = new ComposableIndexTemplate(Arrays.asList("idx*"), null, null, null, null, null, null); + state = metadataIndexTemplateService.addIndexTemplateV2(state, false, "v2-template", v2Template); + final IngestService ingestService = createIngestServiceWithProcessors(); + ingestService.applyClusterState(new ClusterChangedEvent("", state, state)); + final SystemIngestPipelineCache cache = ingestService.getSystemIngestPipelineCache(); + final IndexRequest indexRequest = new IndexRequest("idx"); + + // invoke + boolean hasPipeline = ingestService.resolvePipelines(indexRequest, indexRequest, state.metadata()); + + // verify + verifySystemPipelineResolvedSuccessfully("[idx/template]", hasPipeline, indexRequest, cache); + } + + public void testResolvePipelines_whenUseTemplateV1() throws Exception { + // mock + when(mockSystemProcessorFactory.create(any(), any(), any(), any())).thenReturn(mockSystemProcessor); + when(mockSystemProcessorFactory.isSystemGenerated()).thenReturn(true); + when(mockSystemProcessor.isSystemGenerated()).thenReturn(true); + + IndexTemplateMetadata v1Template = IndexTemplateMetadata.builder("v1-template").patterns(Arrays.asList("fo*", "baz")).build(); + ClusterState state = ClusterState.builder(ClusterState.EMPTY_STATE) + .metadata(Metadata.builder(Metadata.EMPTY_METADATA).put(v1Template).build()) + .build(); + final IngestService ingestService = createIngestServiceWithProcessors(); + ingestService.applyClusterState(new ClusterChangedEvent("", state, state)); + final SystemIngestPipelineCache cache = ingestService.getSystemIngestPipelineCache(); + final IndexRequest indexRequest = new IndexRequest("idx"); + + // invoke + boolean hasPipeline = ingestService.resolvePipelines(indexRequest, indexRequest, state.metadata()); + + // verify + verifySystemPipelineResolvedSuccessfully("[idx/template]", hasPipeline, indexRequest, cache); + } + + private void verifySystemPipelineResolvedSuccessfully( + @NonNull final String id, + final boolean hasPipeline, + @NonNull final IndexRequest indexRequest, + @NonNull final SystemIngestPipelineCache cache + ) { + assertTrue(hasPipeline); + assertTrue(indexRequest.isPipelineResolved()); + assertEquals(id, indexRequest.getSystemIngestPipeline()); + verify(cache, times(1)).getSystemIngestPipeline(eq(id)); + verify(cache, times(1)).cachePipeline(eq(id), any(), eq(Integer.MAX_VALUE)); + } + + @SuppressWarnings("unchecked") + public void testTargetIndexChange() { + // prepare test data + final Map processors = new HashMap<>(); + + // mock a default pipeline do change the target index of the first request + Processor defaultProcessor = mock(Processor.class); + doAnswer(invocationOnMock -> { + List documents = (List) invocationOnMock.getArguments()[0]; + documents.get(0).getIngestDocument().setFieldValue("_index", "new_index"); + Consumer> handler = (Consumer>) invocationOnMock.getArguments()[1]; + handler.accept(documents); + return null; + }).when(defaultProcessor).batchExecute(any(), any()); + processors.put( + "default", + (factories, tag, description, config) -> new CompoundProcessor(false, List.of(defaultProcessor), List.of()) + ); + + // mock a final pipeline do nothing + Processor dummyFinalProcessor = mock(Processor.class); + doAnswer(invocationOnMock -> { + IngestDocument document = (IngestDocument) invocationOnMock.getArguments()[0]; + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(document, null); + return null; + }).when(dummyFinalProcessor).execute(any(), any()); + processors.put( + "dummy", + (factories, tag, description, config) -> new CompoundProcessor(false, List.of(dummyFinalProcessor), List.of()) + ); + + // create ingest service and cluster state + IngestService ingestService = createIngestServiceWithProcessors(processors); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + // add pipeline + clusterState = createPipeline("pipeline", new BytesArray("{\"processors\": [{\"default\" : {}}]}"), ingestService, clusterState); + createPipeline("final_pipeline", new BytesArray("{\"processors\": [{\"dummy\" : {}}]}"), ingestService, clusterState); + + // prepare request + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("pipeline") + .setFinalPipeline("final_pipeline") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("pipeline") + .setFinalPipeline("final_pipeline") + .setSystemIngestPipeline("_none"); + bulkRequest.add(indexRequest2); + + // prepare handler + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); + final List dropHandler = new ArrayList<>(); + + // call + ingestService.executeBulkRequest( + 2, + bulkRequest.requests(), + failureHandler::put, + completionHandler::put, + dropHandler::add, + Names.WRITE + ); + + // verify the default pipeline will process both requests and the final pipeline will only process the second + // one. This happens because the target index of the first doc is changed. + verify(defaultProcessor, times(1)).batchExecute(any(), any()); + verify(dummyFinalProcessor, times(1)).execute(any(), any()); + + // verify the pipeline info of the request 1 will be reset since its target index is changed + assertFalse(indexRequest1.isPipelineResolved()); + assertNull(indexRequest1.getFinalPipeline()); + assertEquals(NOOP_PIPELINE_NAME, indexRequest1.getPipeline()); + + assertTrue(failureHandler.isEmpty()); + assertTrue(dropHandler.isEmpty()); + assertEquals(1, completionHandler.size()); + } + + public void testExecuteBulkRequestInBatchWithSystemPipeline() throws Exception { + // prepare test data + final Map processors = new HashMap<>(); + + // mock a default pipeline do nothing + Processor defaultProcessor = mock(Processor.class); + doAnswer(invocationOnMock -> { + List documents = (List) invocationOnMock.getArguments()[0]; + Consumer> handler = (Consumer>) invocationOnMock.getArguments()[1]; + handler.accept(documents); + return null; + }).when(defaultProcessor).batchExecute(any(), any()); + processors.put( + "default", + (factories, tag, description, config) -> new CompoundProcessor(false, List.of(defaultProcessor), List.of()) + ); + + // mock a final pipeline do nothing + Processor dummyProcessor = mock(Processor.class); + doAnswer(invocationOnMock -> { + List documents = (List) invocationOnMock.getArguments()[0]; + Consumer> handler = (Consumer>) invocationOnMock.getArguments()[1]; + handler.accept(documents); + return null; + }).when(dummyProcessor).batchExecute(any(), any()); + processors.put("dummy", (factories, tag, description, config) -> new CompoundProcessor(false, List.of(dummyProcessor), List.of())); + + // mock a system pipeline do nothing + final Processor dummySystemProcessor = mock(Processor.class); + final Map systemProcessors = createDummyMockSystemProcessors(dummySystemProcessor, true); + + // create ingest service and cluster state + IngestService ingestService = createIngestServiceWithProcessors(processors, systemProcessors); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + // add pipeline + clusterState = createPipeline("pipeline", new BytesArray("{\"processors\": [{\"default\" : {}}]}"), ingestService, clusterState); + createPipeline("final_pipeline", new BytesArray("{\"processors\": [{\"dummy\" : {}}]}"), ingestService, clusterState); + + // prepare request + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("pipeline") + .setFinalPipeline("final_pipeline") + .setSystemIngestPipeline("index_pipeline"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2") + .source(emptyMap()) + .setPipeline("pipeline") + .setFinalPipeline("final_pipeline") + .setSystemIngestPipeline("index_pipeline"); + bulkRequest.add(indexRequest2); + + // prepare handler + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); + final List dropHandler = new ArrayList<>(); + + // call + ingestService.executeBulkRequest( + 2, + bulkRequest.requests(), + failureHandler::put, + completionHandler::put, + dropHandler::add, + Names.WRITE + ); + + // verify + verify(defaultProcessor, times(1)).batchExecute(any(), any()); + verify(dummyProcessor, times(1)).batchExecute(any(), any()); + verify(dummySystemProcessor, times(1)).batchExecute(any(), any()); + assertTrue(failureHandler.isEmpty()); + assertTrue(dropHandler.isEmpty()); + assertEquals(1, completionHandler.size()); + } + + private Map createDummyMockSystemProcessors(Processor dummySystemProcessor, boolean isBatch) + throws Exception { + final Map systemProcessors = new HashMap<>(); + final Processor.Factory systemProcessorFactory = mock(Processor.Factory.class); + when(systemProcessorFactory.isSystemGenerated()).thenReturn(true); + when(systemProcessorFactory.create(any(), any(), any(), any())).thenReturn(dummySystemProcessor); + when(dummySystemProcessor.isSystemGenerated()).thenReturn(true); + if (isBatch) { + doAnswer(invocationOnMock -> { + List documents = (List) invocationOnMock.getArguments()[0]; + Consumer> handler = (Consumer>) invocationOnMock.getArguments()[1]; + handler.accept(documents); + return null; + }).when(dummySystemProcessor).batchExecute(any(), any()); + } else { + doAnswer(invocationOnMock -> { + IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; + BiConsumer handler = (BiConsumer) invocationOnMock.getArguments()[1]; + handler.accept(ingestDocument, null); + return null; + }).when(dummySystemProcessor).execute(any(), any()); + } + + systemProcessors.put("dummy", systemProcessorFactory); + return systemProcessors; + } + + public void testExecuteBulkRequestSingleRequestWithSystemPipeline() throws Exception { + // mock a system pipeline do nothing + final Processor dummySystemProcessor = mock(Processor.class); + final Map systemProcessors = createDummyMockSystemProcessors(dummySystemProcessor, false); + + // add index metadata for the new index + IngestService ingestService = createIngestServiceWithProcessors(Map.of(), systemProcessors); + IndexMetadata indexMetadata = spy( + IndexMetadata.builder("_index") + .settings(settings(Version.CURRENT)) + .putMapping("{}") + .numberOfShards(1) + .numberOfReplicas(0) + .putAlias(AliasMetadata.builder("alias").writeIndex(true).build()) + .build() + ); + when(indexMetadata.getIndex()).thenReturn(new Index("_index", "uuid")); + Metadata metadata = Metadata.builder().indices(Map.of("_index", indexMetadata)).build(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + ingestService.applyClusterState(new ClusterChangedEvent("_name", clusterState, clusterState)); + + // prepare request + BulkRequest bulkRequest = new BulkRequest(); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1") + .source(emptyMap()) + .setPipeline("_none") + .setFinalPipeline("_none") + .setSystemIngestPipeline("[_index/uuid]"); + bulkRequest.add(indexRequest1); + + // prepare handler + final Map failureHandler = new HashMap<>(); + final Map completionHandler = new HashMap<>(); + final List dropHandler = new ArrayList<>(); + + // call + ingestService.executeBulkRequest( + 1, + bulkRequest.requests(), + failureHandler::put, + completionHandler::put, + dropHandler::add, + Names.WRITE + ); + + // verify + verify(dummySystemProcessor, times(1)).execute(any(), any()); + assertTrue(failureHandler.isEmpty()); + assertTrue(dropHandler.isEmpty()); + assertEquals(1, completionHandler.size()); + } + + public void testIngestServiceCreation_whenInvalidSystemProcessor_thenFail() { + Map processors = new HashMap<>(); + processors.put("set", (factories, tag, description, config) -> { + String field = (String) config.remove("field"); + String value = (String) config.remove("value"); + return new FakeProcessor("set", tag, description, (ingestDocument) -> ingestDocument.setFieldValue(field, value)); + }); + + final Exception exception = assertThrows(RuntimeException.class, () -> createIngestServiceWithProcessors(processors, processors)); + + assertEquals("[set] is not a system generated processor factory.", exception.getMessage()); + } } diff --git a/server/src/test/java/org/opensearch/ingest/PipelineTests.java b/server/src/test/java/org/opensearch/ingest/PipelineTests.java new file mode 100644 index 0000000000000..07ee9ae69b1da --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/PipelineTests.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Collections; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class PipelineTests extends OpenSearchTestCase { + + public void testCreatePipelineWithEmptyConfig() { + final Pipeline pipeline = Pipeline.createSystemIngestPipeline("test-index", Collections.emptyMap(), Collections.emptyMap()); + + assertNotNull(pipeline); + assertEquals("test-index_system_generated_ingest_pipeline", pipeline.getId()); + assertTrue(pipeline.getProcessors().isEmpty()); + } + + public void testCreatePipelineWithOneProcessor() throws Exception { + final Processor processor = mock(Processor.class); + final Processor.Factory factory = mock(Processor.Factory.class); + final Map config = Map.of("key", "value"); + when(processor.isSystemGenerated()).thenReturn(true); + when(factory.isSystemGenerated()).thenReturn(true); + when(factory.create(any(), any(), any(), any())).thenReturn(processor); + + final Pipeline pipeline = Pipeline.createSystemIngestPipeline("my-index", Map.of("factory", factory), config); + + assertNotNull(pipeline); + assertEquals("my-index_system_generated_ingest_pipeline", pipeline.getId()); + assertEquals(1, pipeline.getProcessors().size()); + assertSame(processor, pipeline.getProcessors().get(0)); + + verify(factory, times(1)).create(any(), any(), any(), any()); + } + + public void testCreatePipelineWithFactoryException() throws Exception { + final Map config = Map.of("key", "value"); + final Processor.Factory faultyFactory = mock(Processor.Factory.class); + when(faultyFactory.create(any(), any(), any(), any())).thenThrow(new RuntimeException("Factory failed")); + + final RuntimeException e = assertThrows( + RuntimeException.class, + () -> Pipeline.createSystemIngestPipeline("my-index", Map.of("factory", faultyFactory), config) + ); + assertTrue(e.getMessage().contains("Factory failed")); + } +} diff --git a/server/src/test/java/org/opensearch/ingest/SystemIngestPipelineCacheTests.java b/server/src/test/java/org/opensearch/ingest/SystemIngestPipelineCacheTests.java new file mode 100644 index 0000000000000..9ffb671c1a783 --- /dev/null +++ b/server/src/test/java/org/opensearch/ingest/SystemIngestPipelineCacheTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ingest; + +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; + +public class SystemIngestPipelineCacheTests extends OpenSearchTestCase { + private SystemIngestPipelineCache cache; + private Pipeline dummyPipeline; + + @Before + public void setup() { + cache = new SystemIngestPipelineCache(); + dummyPipeline = new Pipeline("id", "description", null, new CompoundProcessor()); + } + + public void testCachePipelineAndRetrieve() { + String index = "test_index"; + cache.cachePipeline(index, dummyPipeline, 10); + + Pipeline retrieved = cache.getSystemIngestPipeline(index); + assertNotNull(retrieved); + assertEquals(dummyPipeline, retrieved); + } + + public void testCacheExceedMaxProcessorNumberThrowsException() { + String index = "test_index"; + CompoundProcessor largeProcessor = mock(CompoundProcessor.class); + Mockito.when(largeProcessor.getProcessors()).thenReturn(new java.util.ArrayList<>(11)); + Pipeline largePipeline = new Pipeline("id", "description", null, largeProcessor); + + try { + cache.cachePipeline(index, largePipeline, 10); + } catch (IllegalArgumentException e) { + assertTrue(e.getMessage().contains("Too many system ingest processors")); + } + } + + public void testInvalidateCache() { + String index = "test_index"; + cache.cachePipeline(index, dummyPipeline, 10); + cache.invalidateCacheForIndex(index); + + assertNull(cache.getSystemIngestPipeline(index)); + } + + public void testEvictionWhenCacheIsFull() { + for (int i = 0; i < 101; i++) { + cache.cachePipeline("index_" + i, dummyPipeline, 10); + } + + // The cache should not exceed 100 entries + assertEquals(100, cache.size()); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 35655bf8d2037..63fba6726716a 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -213,6 +213,7 @@ import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.ingest.IngestService; +import org.opensearch.ingest.SystemIngestPipelineCache; import org.opensearch.monitor.StatusInfo; import org.opensearch.node.ResponseCollectorService; import org.opensearch.node.remotestore.RemoteStoreNodeService; @@ -2276,7 +2277,9 @@ public void onFailure(final Exception e) { new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), Collections.emptyList(), client, - indicesService + indicesService, + namedXContentRegistry, + new SystemIngestPipelineCache() ), transportShardBulkAction, client,