Skip to content

Commit 4c3230a

Browse files
[Pull-based ingestion] Support updates and deletes in ingestion flow (#17822)
* Support updates and deletes in ingestion flow Signed-off-by: Varun Bharadwaj <[email protected]> * Move ID generation to common util Signed-off-by: Varun Bharadwaj <[email protected]> --------- Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 396add1 commit 4c3230a

File tree

10 files changed

+237
-28
lines changed

10 files changed

+237
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
2929
- Added Search Only strict routing setting ([#17803](https://github.com/opensearch-project/OpenSearch/pull/17803))
3030
- Disable the index API for ingestion engine ([#17768](https://github.com/opensearch-project/OpenSearch/pull/17768))
31+
- Add update and delete support in pull-based ingestion ([#17822](https://github.com/opensearch-project/OpenSearch/pull/17822))
3132

3233
### Changed
3334
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.opensearch.action.search.SearchResponse;
1616
import org.opensearch.cluster.metadata.IndexMetadata;
1717
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.index.query.BoolQueryBuilder;
1819
import org.opensearch.index.query.RangeQueryBuilder;
20+
import org.opensearch.index.query.TermQueryBuilder;
1921
import org.opensearch.indices.pollingingest.PollingIngestStats;
2022
import org.opensearch.plugins.PluginInfo;
2123
import org.opensearch.test.OpenSearchIntegTestCase;
@@ -73,8 +75,8 @@ public void testKafkaIngestion() {
7375
}
7476

7577
public void testKafkaIngestion_RewindByTimeStamp() {
76-
produceData("1", "name1", "24", 1739459500000L);
77-
produceData("2", "name2", "20", 1739459800000L);
78+
produceData("1", "name1", "24", 1739459500000L, "index");
79+
produceData("2", "name2", "20", 1739459800000L, "index");
7880

7981
// create an index with ingestion source from kafka
8082
createIndex(
@@ -135,4 +137,67 @@ public void testCloseIndex() throws Exception {
135137
ensureGreen(indexName);
136138
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
137139
}
140+
141+
public void testUpdateAndDelete() throws Exception {
142+
// Step 1: Produce message and wait for it to be searchable
143+
144+
produceData("1", "name", "25", defaultMessageTimestamp, "index");
145+
createIndexWithDefaultSettings(1, 0);
146+
ensureGreen(indexName);
147+
waitForState(() -> {
148+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));
149+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
150+
assertThat(response.getHits().getTotalHits().value(), is(1L));
151+
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
152+
});
153+
154+
// Step 2: Update age field from 25 to 30 and validate
155+
156+
produceData("1", "name", "30", defaultMessageTimestamp, "index");
157+
waitForState(() -> {
158+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));
159+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
160+
assertThat(response.getHits().getTotalHits().value(), is(1L));
161+
return 30 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
162+
});
163+
164+
// Step 3: Delete the document and validate
165+
produceData("1", "name", "30", defaultMessageTimestamp, "delete");
166+
waitForState(() -> {
167+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", "1"));
168+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
169+
return response.getHits().getTotalHits().value() == 0;
170+
});
171+
}
172+
173+
public void testUpdateWithoutIDField() throws Exception {
174+
// Step 1: Produce message without ID
175+
String payload = "{\"_op_type\":\"index\",\"_source\":{\"name\":\"name\", \"age\": 25}}";
176+
produceData(payload);
177+
178+
createIndexWithDefaultSettings(1, 0);
179+
ensureGreen(indexName);
180+
181+
waitForState(() -> {
182+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("age", "25"));
183+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
184+
assertThat(response.getHits().getTotalHits().value(), is(1L));
185+
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
186+
});
187+
188+
SearchResponse searchableDocsResponse = client().prepareSearch(indexName).setSize(10).setPreference("_only_local").get();
189+
assertThat(searchableDocsResponse.getHits().getTotalHits().value(), is(1L));
190+
assertEquals(25, searchableDocsResponse.getHits().getHits()[0].getSourceAsMap().get("age"));
191+
String id = searchableDocsResponse.getHits().getHits()[0].getId();
192+
193+
// Step 2: Produce an update message using retrieved ID and validate
194+
195+
produceData(id, "name", "30", defaultMessageTimestamp, "index");
196+
waitForState(() -> {
197+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", id));
198+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
199+
assertThat(response.getHits().getTotalHits().value(), is(1L));
200+
return 30 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
201+
});
202+
}
138203
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,20 +93,25 @@ private void stopKafka() {
9393
}
9494

9595
protected void produceData(String id, String name, String age) {
96-
produceData(id, name, age, defaultMessageTimestamp);
96+
produceData(id, name, age, defaultMessageTimestamp, "index");
9797
}
9898

99-
protected void produceData(String id, String name, String age, long timestamp) {
99+
protected void produceData(String id, String name, String age, long timestamp, String opType) {
100100
String payload = String.format(
101101
Locale.ROOT,
102-
"{\"_id\":\"%s\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
102+
"{\"_id\":\"%s\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
103103
id,
104+
opType,
104105
name,
105106
age
106107
);
107108
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
108109
}
109110

111+
protected void produceData(String payload) {
112+
producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload));
113+
}
114+
110115
protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
111116
assertBusy(() -> {
112117
for (String node : nodes) {

server/src/main/java/org/opensearch/action/index/IndexRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646
import org.opensearch.cluster.metadata.MappingMetadata;
4747
import org.opensearch.cluster.metadata.Metadata;
4848
import org.opensearch.common.Nullable;
49-
import org.opensearch.common.UUIDs;
5049
import org.opensearch.common.annotation.PublicApi;
5150
import org.opensearch.common.lucene.uid.Versions;
51+
import org.opensearch.common.util.RequestUtils;
5252
import org.opensearch.common.xcontent.XContentHelper;
5353
import org.opensearch.common.xcontent.XContentType;
5454
import org.opensearch.core.common.Strings;
@@ -625,7 +625,7 @@ public void process(Version indexCreatedVersion, @Nullable MappingMetadata mappi
625625
assert ifSeqNo == UNASSIGNED_SEQ_NO;
626626
assert ifPrimaryTerm == UNASSIGNED_PRIMARY_TERM;
627627
autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
628-
id(UUIDs.base64UUID());
628+
id(RequestUtils.generateID());
629629
}
630630
}
631631

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.util;
10+
11+
import org.opensearch.common.UUIDs;
12+
13+
/**
14+
* Common utility methods for request handling.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public final class RequestUtils {
19+
20+
private RequestUtils() {}
21+
22+
/**
23+
* Generates a new ID field for new documents.
24+
*/
25+
public static String generateID() {
26+
return UUIDs.base64UUID();
27+
}
28+
}

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import org.apache.lucene.index.DirectoryReader;
1212
import org.apache.lucene.index.IndexWriter;
13+
import org.apache.lucene.index.Term;
1314
import org.apache.lucene.search.IndexSearcher;
1415
import org.opensearch.ExceptionsHelper;
1516
import org.opensearch.action.admin.indices.streamingingestion.state.ShardIngestionState;
@@ -22,6 +23,8 @@
2223
import org.opensearch.index.mapper.DocumentMapperForType;
2324
import org.opensearch.index.mapper.IdFieldMapper;
2425
import org.opensearch.index.mapper.ParseContext;
26+
import org.opensearch.index.mapper.ParsedDocument;
27+
import org.opensearch.index.mapper.SeqNoFieldMapper;
2528
import org.opensearch.index.seqno.SequenceNumbers;
2629
import org.opensearch.index.translog.NoOpTranslogManager;
2730
import org.opensearch.index.translog.Translog;
@@ -43,6 +46,7 @@
4346
import java.util.Set;
4447
import java.util.function.BiFunction;
4548

49+
import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
4650
import static org.opensearch.index.translog.Translog.EMPTY_TRANSLOG_SNAPSHOT;
4751

4852
/**
@@ -163,8 +167,13 @@ public IndexResult indexInternal(Index index) throws IOException {
163167
}
164168

165169
private IndexResult indexIntoLucene(Index index) throws IOException {
166-
// todo: handle updates
167-
addDocs(index.docs(), indexWriter);
170+
if (index.getAutoGeneratedIdTimestamp() != UNSET_AUTO_GENERATED_TIMESTAMP) {
171+
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
172+
+ index.getAutoGeneratedIdTimestamp();
173+
addDocs(index.docs(), indexWriter);
174+
} else {
175+
updateDocs(index.uid(), index.docs(), indexWriter);
176+
}
168177
return new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), true);
169178
}
170179

@@ -176,11 +185,36 @@ private void addDocs(final List<ParseContext.Document> docs, final IndexWriter i
176185
}
177186
}
178187

188+
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
189+
if (docs.size() > 1) {
190+
indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
191+
} else {
192+
indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
193+
}
194+
}
195+
179196
@Override
180197
public DeleteResult delete(Delete delete) throws IOException {
181198
throw new IngestionEngineException("push-based deletion is not supported in ingestion engine, use streaming source instead");
182199
}
183200

201+
/**
202+
* Processes delete operations. This is used internally by the stream poller only.
203+
*/
204+
public DeleteResult deleteInternal(Delete delete) throws IOException {
205+
assert Objects.equals(delete.uid().field(), IdFieldMapper.NAME) : delete.uid().field();
206+
ensureOpen();
207+
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.id());
208+
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
209+
final ParseContext.Document doc = tombstone.docs().get(0);
210+
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null : "Delete tombstone document but _tombstone field is not set ["
211+
+ doc
212+
+ " ]";
213+
doc.add(softDeletesField);
214+
indexWriter.softUpdateDocument(delete.uid(), doc, softDeletesField);
215+
return new DeleteResult(1, delete.primaryTerm(), -1, true);
216+
}
217+
184218
@Override
185219
public NoOpResult noOp(NoOp noOp) throws IOException {
186220
ensureOpen();

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ public class InternalEngine extends Engine {
161161
protected final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
162162
protected final SoftDeletesPolicy softDeletesPolicy;
163163
protected final AtomicBoolean shouldPeriodicallyFlushAfterBigMerge = new AtomicBoolean(false);
164+
protected final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
164165

165166
@Nullable
166167
protected final String historyUUID;
@@ -197,7 +198,6 @@ public class InternalEngine extends Engine {
197198
private final CounterMetric numDocDeletes = new CounterMetric();
198199
private final CounterMetric numDocAppends = new CounterMetric();
199200
private final CounterMetric numDocUpdates = new CounterMetric();
200-
private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
201201
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
202202

203203
private final CompletionStatsCache completionStatsCache;

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 48 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
import org.opensearch.action.DocWriteRequest;
1616
import org.opensearch.common.lucene.uid.Versions;
1717
import org.opensearch.common.metrics.CounterMetric;
18+
import org.opensearch.common.util.RequestUtils;
1819
import org.opensearch.common.xcontent.XContentFactory;
1920
import org.opensearch.common.xcontent.XContentHelper;
21+
import org.opensearch.core.common.Strings;
2022
import org.opensearch.core.common.bytes.BytesArray;
2123
import org.opensearch.core.common.bytes.BytesReference;
2224
import org.opensearch.core.xcontent.MediaTypeRegistry;
@@ -38,6 +40,7 @@
3840
import java.util.concurrent.BlockingQueue;
3941
import java.util.concurrent.TimeUnit;
4042

43+
import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
4144
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
4245

4346
/**
@@ -122,7 +125,9 @@ protected void process(Message message, IngestionShardPointer pointer) {
122125
engine.indexInternal((Engine.Index) operation);
123126
break;
124127
case DELETE:
125-
engine.delete((Engine.Delete) operation);
128+
engine.deleteInternal((Engine.Delete) operation);
129+
break;
130+
case NO_OP:
126131
break;
127132
default:
128133
throw new IllegalArgumentException("Invalid operation: " + operation);
@@ -140,15 +145,23 @@ protected void process(Message message, IngestionShardPointer pointer) {
140145
* @return the engine operation
141146
*/
142147
protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer pointer) throws IOException {
143-
BytesReference payloadBR = new BytesArray(payload);
144-
Map<String, Object> payloadMap = XContentHelper.convertToMap(payloadBR, false, MediaTypeRegistry.xContentType(payloadBR)).v2();
148+
Map<String, Object> payloadMap = getParsedPayloadMap(payload);
145149

146-
String id = (String) payloadMap.getOrDefault(ID, "null");
147150
if (payloadMap.containsKey(OP_TYPE) && !(payloadMap.get(OP_TYPE) instanceof String)) {
148151
// TODO: add metric
149152
logger.error("_op_type field is of type {} but not string, skipping the message", payloadMap.get(OP_TYPE).getClass());
150153
return null;
151154
}
155+
156+
String id = (String) payloadMap.get(ID);
157+
long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
158+
if (Strings.isNullOrEmpty(id)) {
159+
// auto generate ID for the message
160+
id = RequestUtils.generateID();
161+
payloadMap.put(ID, id);
162+
autoGeneratedIdTimestamp = System.currentTimeMillis();
163+
}
164+
152165
String opTypeString = (String) payloadMap.getOrDefault(OP_TYPE, "index");
153166
DocWriteRequest.OpType opType = DocWriteRequest.OpType.fromString(opTypeString);
154167

@@ -177,33 +190,44 @@ protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer po
177190
document.add(new StoredField(IngestionShardPointer.OFFSET_FIELD, pointer.asString()));
178191

179192
operation = new Engine.Index(
180-
new Term("_id", id),
193+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
181194
doc,
182195
0,
183196
1,
184197
Versions.MATCH_ANY,
185198
VersionType.INTERNAL,
186199
Engine.Operation.Origin.PRIMARY,
187200
System.nanoTime(),
188-
System.currentTimeMillis(),
201+
autoGeneratedIdTimestamp,
189202
false,
190203
UNASSIGNED_SEQ_NO,
191204
0
192205
);
193206
break;
194207
case DELETE:
195-
operation = new Engine.Delete(
196-
id,
197-
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
198-
0,
199-
1,
200-
Versions.MATCH_ANY,
201-
VersionType.INTERNAL,
202-
Engine.Operation.Origin.PRIMARY,
203-
System.nanoTime(),
204-
UNASSIGNED_SEQ_NO,
205-
0
206-
);
208+
if (autoGeneratedIdTimestamp != UNSET_AUTO_GENERATED_TIMESTAMP) {
209+
logger.info("Delete operation without ID received, and will be dropped.");
210+
operation = new Engine.NoOp(
211+
0,
212+
1,
213+
Engine.Operation.Origin.PRIMARY,
214+
System.nanoTime(),
215+
"Delete operation is missing ID"
216+
);
217+
} else {
218+
operation = new Engine.Delete(
219+
id,
220+
new Term(IdFieldMapper.NAME, Uid.encodeId(id)),
221+
0,
222+
1,
223+
Versions.MATCH_ANY,
224+
VersionType.INTERNAL,
225+
Engine.Operation.Origin.PRIMARY,
226+
System.nanoTime(),
227+
UNASSIGNED_SEQ_NO,
228+
0
229+
);
230+
}
207231
break;
208232
default:
209233
logger.error("Unsupported operation type {}", opType);
@@ -212,6 +236,12 @@ protected Engine.Operation getOperation(byte[] payload, IngestionShardPointer po
212236

213237
return operation;
214238
}
239+
240+
private Map<String, Object> getParsedPayloadMap(byte[] payload) {
241+
BytesReference payloadBR = new BytesArray(payload);
242+
Map<String, Object> payloadMap = XContentHelper.convertToMap(payloadBR, false, MediaTypeRegistry.xContentType(payloadBR)).v2();
243+
return payloadMap;
244+
}
215245
}
216246

217247
private static BytesReference convertToBytes(Object object) throws IOException {

0 commit comments

Comments
 (0)