Skip to content

Commit 04595e1

Browse files
committed
disable push-API for indexing in ingestionEngine
Signed-off-by: Yupeng Fu <[email protected]>
1 parent a586a62 commit 04595e1

File tree

3 files changed

+31
-2
lines changed

3 files changed

+31
-2
lines changed

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.indices.pollingingest.StreamPoller;
3636

3737
import java.io.IOException;
38+
import java.io.UnsupportedEncodingException;
3839
import java.util.HashMap;
3940
import java.util.HashSet;
4041
import java.util.List;
@@ -145,6 +146,16 @@ protected Set<IngestionShardPointer> fetchPersistedOffsets(DirectoryReader direc
145146

146147
@Override
147148
public IndexResult index(Index index) throws IOException {
149+
throw new UnsupportedEncodingException("push-based indexing is not supported in ingestion engine, use streaming source instead");
150+
}
151+
152+
/**
153+
* Indexes the document into the engine. This is used internally by the stream poller only.
154+
* @param index the index request
155+
* @return the index result
156+
* @throws IOException if an error occurs
157+
*/
158+
public IndexResult indexInternal(Index index) throws IOException {
148159
assert Objects.equals(index.uid().field(), IdFieldMapper.NAME) : index.uid().field();
149160
ensureOpen();
150161
final IndexResult indexResult;
@@ -168,7 +179,7 @@ private void addDocs(final List<ParseContext.Document> docs, final IndexWriter i
168179

169180
@Override
170181
public DeleteResult delete(Delete delete) throws IOException {
171-
return null;
182+
throw new UnsupportedEncodingException("push-based deletion is not supported in ingestion engine, use streaming source instead");
172183
}
173184

174185
@Override

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ protected void process(Message message, IngestionShardPointer pointer) {
119119
Engine.Operation operation = getOperation(payload, pointer);
120120
switch (operation.operationType()) {
121121
case INDEX:
122-
engine.index((Engine.Index) operation);
122+
engine.indexInternal((Engine.Index) operation);
123123
break;
124124
case DELETE:
125125
engine.delete((Engine.Delete) operation);

server/src/test/java/org/opensearch/index/engine/IngestionEngineTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import java.util.concurrent.TimeUnit;
3636
import java.util.concurrent.atomic.AtomicLong;
3737

38+
import org.mockito.Mockito;
39+
3840
import static org.awaitility.Awaitility.await;
3941
import static org.mockito.ArgumentMatchers.any;
4042
import static org.mockito.Mockito.doThrow;
@@ -128,6 +130,22 @@ public void testRecovery() throws IOException {
128130
waitForResults(ingestionEngine, 4);
129131
}
130132

133+
public void testPushAPIFailures() throws IOException {
134+
try {
135+
ingestionEngine.index(Mockito.any());
136+
fail("Expected UnsupportedOperationException to be thrown");
137+
} catch (Exception e) {
138+
assertEquals("push-based indexing is not supported in ingestion engine, use streaming source instead", e.getMessage());
139+
}
140+
141+
try {
142+
ingestionEngine.delete(Mockito.any());
143+
fail("Expected UnsupportedOperationException to be thrown");
144+
} catch (Exception e) {
145+
assertEquals("push-based deletion is not supported in ingestion engine, use streaming source instead", e.getMessage());
146+
}
147+
}
148+
131149
public void testCreationFailure() throws IOException {
132150
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
133151
FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);

0 commit comments

Comments
 (0)