Skip to content

Commit 6d2ac3e

Browse files
committed
Introduce index based ingest pipeline
Signed-off-by: Bo Zhang <[email protected]>
1 parent 86a2000 commit 6d2ac3e

23 files changed

+1707
-233
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2424
- Implement fixed interval refresh task scheduling ([#17777](https://github.com/opensearch-project/OpenSearch/pull/17777))
2525
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
2626
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
27+
- Introduce index based ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
2728

2829
### Changed
2930
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@
4646
import org.opensearch.action.support.replication.ReplicationRequest;
4747
import org.opensearch.action.update.UpdateRequest;
4848
import org.opensearch.common.Nullable;
49+
import org.opensearch.common.UUIDs;
4950
import org.opensearch.common.annotation.PublicApi;
5051
import org.opensearch.common.unit.TimeValue;
52+
import org.opensearch.common.util.FeatureFlags;
5153
import org.opensearch.core.common.Strings;
5254
import org.opensearch.core.common.bytes.BytesArray;
5355
import org.opensearch.core.common.bytes.BytesReference;
@@ -62,6 +64,7 @@
6264
import java.util.Collections;
6365
import java.util.HashSet;
6466
import java.util.List;
67+
import java.util.Locale;
6568
import java.util.Objects;
6669
import java.util.Set;
6770

@@ -101,7 +104,13 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
101104

102105
private long sizeInBytes = 0;
103106

104-
public BulkRequest() {}
107+
private String uuid;
108+
109+
public BulkRequest() {
110+
if (FeatureFlags.isEnabled(FeatureFlags.INDEX_BASED_INGEST_PIPELINE)) {
111+
uuid = UUIDs.randomBase64UUID().toLowerCase(Locale.ROOT);
112+
}
113+
}
105114

106115
public BulkRequest(StreamInput in) throws IOException {
107116
super(in);
@@ -112,6 +121,9 @@ public BulkRequest(StreamInput in) throws IOException {
112121
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
113122
batchSize = in.readInt();
114123
}
124+
if (FeatureFlags.isEnabled(FeatureFlags.INDEX_BASED_INGEST_PIPELINE)) {
125+
uuid = in.readOptionalString();
126+
}
115127
}
116128

117129
public BulkRequest(@Nullable String globalIndex) {
@@ -482,6 +494,9 @@ public void writeTo(StreamOutput out) throws IOException {
482494
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
483495
out.writeInt(batchSize);
484496
}
497+
if (FeatureFlags.isEnabled(FeatureFlags.INDEX_BASED_INGEST_PIPELINE)) {
498+
out.writeOptionalString(uuid);
499+
}
485500
}
486501

487502
@Override
@@ -515,4 +530,8 @@ public long ramBytesUsed() {
515530
public Set<String> getIndices() {
516531
return Collections.unmodifiableSet(indices);
517532
}
533+
534+
public String getUuid() {
535+
return uuid;
536+
}
518537
}

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.opensearch.common.inject.Inject;
7070
import org.opensearch.common.lease.Releasable;
7171
import org.opensearch.common.unit.TimeValue;
72+
import org.opensearch.common.util.FeatureFlags;
7273
import org.opensearch.common.util.concurrent.AtomicArray;
7374
import org.opensearch.core.Assertions;
7475
import org.opensearch.core.action.ActionListener;
@@ -250,8 +251,11 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
250251
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
251252
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
252253
if (indexRequest != null) {
254+
if (FeatureFlags.isEnabled(FeatureFlags.INDEX_BASED_INGEST_PIPELINE)) {
255+
indexRequest.setBulkUuid(bulkRequest.getUuid());
256+
}
253257
// Each index request needs to be evaluated, because this method also modifies the IndexRequest
254-
boolean indexRequestHasPipeline = IngestService.resolvePipelines(actionRequest, indexRequest, metadata);
258+
boolean indexRequestHasPipeline = ingestService.resolvePipelines(actionRequest, indexRequest, metadata);
255259
hasIndexRequestsWithPipelines |= indexRequestHasPipeline;
256260
}
257261

server/src/main/java/org/opensearch/action/index/IndexRequest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.opensearch.common.UUIDs;
5050
import org.opensearch.common.annotation.PublicApi;
5151
import org.opensearch.common.lucene.uid.Versions;
52+
import org.opensearch.common.util.FeatureFlags;
5253
import org.opensearch.common.xcontent.XContentHelper;
5354
import org.opensearch.common.xcontent.XContentType;
5455
import org.opensearch.core.common.Strings;
@@ -123,6 +124,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
123124

124125
private String pipeline;
125126
private String finalPipeline;
127+
private String indexBasedIngestPipeline;
128+
/**
129+
* The uuid of the bulk request containing this request.
130+
*/
131+
private String bulkUuid;
126132

127133
private boolean isPipelineResolved;
128134

@@ -158,6 +164,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
158164
versionType = VersionType.fromValue(in.readByte());
159165
pipeline = in.readOptionalString();
160166
finalPipeline = in.readOptionalString();
167+
// When enable the FF we should add a min version check to ensure the backward compatibility.
168+
if (FeatureFlags.isEnabled(FeatureFlags.INDEX_BASED_INGEST_PIPELINE)) {
169+
indexBasedIngestPipeline = in.readOptionalString();
170+
bulkUuid = in.readOptionalString();
171+
}
161172
isPipelineResolved = in.readBoolean();
162173
isRetry = in.readBoolean();
163174
autoGeneratedTimestamp = in.readLong();
@@ -314,6 +325,21 @@ public String getPipeline() {
314325
return this.pipeline;
315326
}
316327

328+
/**
329+
* Sets the index based ingest pipeline to be executed before indexing the document
330+
*/
331+
public IndexRequest setIndexBasedIngestPipeline(final String indexBasedIngestPipeline) {
332+
this.indexBasedIngestPipeline = indexBasedIngestPipeline;
333+
return this;
334+
}
335+
336+
/**
337+
* Returns the index based ingest pipeline to be executed before indexing the document
338+
*/
339+
public String getIndexBasedIngestPipeline() {
340+
return this.indexBasedIngestPipeline;
341+
}
342+
317343
/**
318344
* Sets the final ingest pipeline to be executed before indexing the document.
319345
*
@@ -668,6 +694,11 @@ private void writeBody(StreamOutput out) throws IOException {
668694
out.writeByte(versionType.getValue());
669695
out.writeOptionalString(pipeline);
670696
out.writeOptionalString(finalPipeline);
697+
// When enable the FF we should add a min version check to ensure the backward compatibility.
698+
if (FeatureFlags.isEnabled(FeatureFlags.INDEX_BASED_INGEST_PIPELINE)) {
699+
out.writeOptionalString(indexBasedIngestPipeline);
700+
out.writeOptionalString(bulkUuid);
701+
}
671702
out.writeBoolean(isPipelineResolved);
672703
out.writeBoolean(isRetry);
673704
out.writeLong(autoGeneratedTimestamp);
@@ -743,4 +774,13 @@ public IndexRequest setRequireAlias(boolean requireAlias) {
743774
this.requireAlias = requireAlias;
744775
return this;
745776
}
777+
778+
public IndexRequest setBulkUuid(final String bulkUuid) {
779+
this.bulkUuid = bulkUuid;
780+
return this;
781+
}
782+
783+
public String getBulkUuid() {
784+
return bulkUuid;
785+
}
746786
}

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,13 @@ public class FeatureFlags {
127127
public static final String ARROW_STREAMS = "opensearch.experimental.feature.arrow.streams.enabled";
128128
public static final Setting<Boolean> ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope);
129129

130+
public static final String INDEX_BASED_INGEST_PIPELINE = "opensearch.experimental.feature.index_based_ingest_pipeline.enabled";
131+
public static final Setting<Boolean> INDEX_BASED_INGEST_PIPELINE_SETTING = Setting.boolSetting(
132+
INDEX_BASED_INGEST_PIPELINE,
133+
false,
134+
Property.NodeScope
135+
);
136+
130137
private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
131138
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
132139
EXTENSIONS_SETTING,
@@ -137,7 +144,8 @@ public class FeatureFlags {
137144
APPLICATION_BASED_CONFIGURATION_TEMPLATES_SETTING,
138145
READER_WRITER_SPLIT_EXPERIMENTAL_SETTING,
139146
TERM_VERSION_PRECOMMIT_ENABLE_SETTING,
140-
ARROW_STREAMS_SETTING
147+
ARROW_STREAMS_SETTING,
148+
INDEX_BASED_INGEST_PIPELINE_SETTING
141149
);
142150

143151
/**

server/src/main/java/org/opensearch/ingest/AbstractBatchingProcessor.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,51 @@ protected abstract AbstractBatchingProcessor newProcessor(
133133
Map<String, Object> config
134134
);
135135
}
136+
137+
/**
138+
* Factory class for creating {@link AbstractBatchingProcessor} instances based on index config.
139+
*
140+
* Since the processor config is generated based on the index config so the batch size info should also be defined
141+
* as part of it. And different processors can have their own logic to decide the batch size so let each
142+
* implementation of the newProcessor to handle it.
143+
*
144+
* @opensearch.internal
145+
*/
146+
public abstract static class IndexBasedFactory implements Processor.Factory {
147+
final String processorType;
148+
149+
protected IndexBasedFactory(String processorType) {
150+
this.processorType = processorType;
151+
}
152+
153+
/**
154+
* Creates a new processor instance based on index config.
155+
*
156+
* @param processorFactories The processor factories.
157+
* @param tag The processor tag.
158+
* @param description The processor description.
159+
* @param config The processor configuration.
160+
* @return The new AbstractBatchProcessor instance.
161+
* @throws Exception If the processor could not be created.
162+
*/
163+
@Override
164+
public AbstractBatchingProcessor create(
165+
Map<String, Processor.Factory> processorFactories,
166+
String tag,
167+
String description,
168+
Map<String, Object> config
169+
) throws Exception {
170+
return newProcessor(tag, description, config);
171+
}
172+
173+
/**
174+
* Returns a new processor instance based on the index config.
175+
*
176+
* @param tag tag of the processor
177+
* @param description description of the processor
178+
* @param config configuration of the processor
179+
* @return a new batch processor instance
180+
*/
181+
protected abstract AbstractBatchingProcessor newProcessor(String tag, String description, Map<String, Object> config);
182+
}
136183
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.ingest;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.concurrent.ConcurrentHashMap;
15+
import java.util.concurrent.TimeUnit;
16+
17+
import reactor.util.annotation.NonNull;
18+
19+
/**
20+
* Cache for index based ingest pipeline
21+
*/
22+
public class IndexBasedIngestPipelineCache {
23+
private final Map<String, CacheEntry> cache;
24+
25+
private final int MAX_ENTRIES = 100;
26+
private final long EXPIRES_IN_MINUTES = 60;
27+
private final int MAX_PROCESSOR_NUMBER = 10;
28+
29+
public IndexBasedIngestPipelineCache() {
30+
this.cache = new ConcurrentHashMap<>();
31+
}
32+
33+
/**
34+
* Cache an index based ingest pipeline for an index.
35+
* @param index [index_name/index_uuid]
36+
* @param indexBasedIngestPipeline A pipeline created based on index configuration.
37+
*/
38+
public void cachePipeline(@NonNull final String index, @NonNull final Pipeline indexBasedIngestPipeline) {
39+
if (indexBasedIngestPipeline.getProcessors().size() > MAX_PROCESSOR_NUMBER) {
40+
throw new IllegalArgumentException("Too many index based ingest processors for index: " + index);
41+
}
42+
if (cache.size() >= MAX_ENTRIES) {
43+
evictOldestAndExpiredCacheEntry();
44+
}
45+
cache.put(index, new CacheEntry(indexBasedIngestPipeline));
46+
}
47+
48+
// Evict the oldest and expired cache entry based on time
49+
private void evictOldestAndExpiredCacheEntry() {
50+
String oldestIndex = null;
51+
long oldestTimestamp = Long.MAX_VALUE;
52+
final List<String> expiredIndices = new ArrayList<>();
53+
54+
for (Map.Entry<String, CacheEntry> entry : cache.entrySet()) {
55+
final CacheEntry cacheEntry = entry.getValue();
56+
if (cacheEntry.getLastAccessTimestamp() < oldestTimestamp) {
57+
oldestTimestamp = cacheEntry.getLastAccessTimestamp();
58+
oldestIndex = entry.getKey();
59+
}
60+
if (cacheEntry.isExpired()) {
61+
expiredIndices.add(entry.getKey());
62+
}
63+
}
64+
65+
if (oldestIndex != null) {
66+
// Remove the oldest entry from both the cache and access order
67+
cache.remove(oldestIndex);
68+
}
69+
70+
for (final String expiredIndex : expiredIndices) {
71+
cache.remove(expiredIndex);
72+
}
73+
}
74+
75+
/**
76+
* Get the cached index based ingest pipeline for an index.
77+
* @param index [index_name/index_uuid]
78+
* @return cached index based ingest pipeline
79+
*/
80+
public Pipeline getIndexBasedIngestPipeline(@NonNull final String index) {
81+
// Check if the cache contains a valid entry for the index
82+
final CacheEntry entry = cache.get(index);
83+
if (entry != null) {
84+
if (entry.isExpired()) {
85+
cache.remove(index);
86+
return null;
87+
} else {
88+
entry.setLastAccessTimestamp(System.currentTimeMillis());
89+
return entry.getIndexBasedIngestPipeline();
90+
}
91+
}
92+
return null;
93+
}
94+
95+
public int size() {
96+
return cache.size();
97+
}
98+
99+
/**
100+
* Invalidate the cache for an index.
101+
* @param index [index_name/index_uuid]
102+
*/
103+
public void invalidateCacheForIndex(@NonNull final String index) {
104+
cache.remove(index);
105+
}
106+
107+
private class CacheEntry {
108+
private final Pipeline indexBasedIngestPipeline;
109+
private final long createTimestamp;
110+
private long lastAccessTimestamp;
111+
112+
public CacheEntry(Pipeline indexBasedIngestPipeline) {
113+
this.indexBasedIngestPipeline = indexBasedIngestPipeline;
114+
this.createTimestamp = System.currentTimeMillis();
115+
this.lastAccessTimestamp = createTimestamp;
116+
}
117+
118+
public Pipeline getIndexBasedIngestPipeline() {
119+
return indexBasedIngestPipeline;
120+
}
121+
122+
public boolean isExpired() {
123+
return System.currentTimeMillis() - createTimestamp > TimeUnit.MINUTES.toMillis(EXPIRES_IN_MINUTES);
124+
}
125+
126+
public void setLastAccessTimestamp(final long lastAccessTimestamp) {
127+
this.lastAccessTimestamp = lastAccessTimestamp;
128+
}
129+
130+
public long getLastAccessTimestamp() {
131+
return this.lastAccessTimestamp;
132+
}
133+
}
134+
}

0 commit comments

Comments
 (0)