Description
Describe the bug
In IngestService we use a while loop execute each ingest pipeline and we assume the pipeline execution is synchronous so that in the callback function of the innerBatchExecute we can decide if we need to drain the iterator.
But actually we also support the asynchronous pipeline execution. If we have some async work in the default pipeline then the callback of the innerBatchExecute will happen in another thread while the current thread will continue to execute the final pipeline. But in the callback of the innerBatchExecute of the default pipeline will also try to execute the final pipeline due to the concurrency. In that case the counter will be messed up because at the end we will have two threads finishing the pipeline execution and try to decrement the counter by the size of the results twice. Then in the second time we will have a negative counter and run into a fatal error.
Related component
Indexing
To Reproduce
- Create two pipelines and the default pipeline should have an async process.
Follow this doc to register and deploy a ML model and then create the default pipeline
PUT _ingest/pipeline/nlp-ingest-pipeline
{
"description": "An NLP ingest pipeline",
"processors": [
{
"text_embedding": {
"model_id": <model_id>,
"field_map": {
"passage": "passage_embedding"
}
}
}
]
}
- Create an index with both the default ingest pipeline and final ingest pipeline
PUT my-nlp-index
{
"settings": {
"index.knn": true,
"default_pipeline": "nlp-ingest-pipeline",
"index.final_pipeline": "final-pipeline"
},
"mappings": {
"properties": {
"passage_embedding": {
"type": "knn_vector",
"dimension": 768,
"space_type": "l2"
},
"passage":{
"type": "text"
}
}
}
}
- Index doc
PUT my-nlp-index/_doc/1
{
"passage":"passage"
}
{
"_index": "my-nlp-index",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1
}
We will get a success response but the node will be down due to:
fatal error in thread [opensearch[integTest-0][opensearch_ml_predict][T#1]], exiting
» java.lang.AssertionError
» at org.opensearch.ingest.IngestService.lambda$executePipelines$7(IngestService.java:932)
» at org.opensearch.ingest.IngestService.lambda$innerExecute$11(IngestService.java:1040)
» at org.opensearch.ingest.IngestDocument.lambda$executePipeline$0(IngestDocument.java:814)
» at org.opensearch.ingest.Pipeline.lambda$execute$0(Pipeline.java:140)
» at org.opensearch.ingest.CompoundProcessor.innerExecute(CompoundProcessor.java:260)
» at org.opensearch.ingest.CompoundProcessor.lambda$innerExecute$4(CompoundProcessor.java:287)
» at org.opensearch.neuralsearch.processor.InferenceProcessor.lambda$generateAndSetInference$12(InferenceProcessor.java:792)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.neuralsearch.ml.MLCommonsClientAccessor.lambda$retryableInferenceSentencesWithVectorResult$4(MLCommonsClientAccessor.java:159)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.ml.client.MachineLearningNodeClient.lambda$getMlPredictionTaskResponseActionListener$5(MachineLearningNodeClient.java:378)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.ml.client.MachineLearningNodeClient.lambda$wrapActionListener$6(MachineLearningNodeClient.java:394)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.action.support.TransportAction$1.onResponse(TransportAction.java:115)
» at org.opensearch.action.support.TransportAction$1.onResponse(TransportAction.java:109)
» at org.opensearch.core.action.ActionListener$6.onResponse(ActionListener.java:301)
» at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
» at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
» at org.opensearch.ml.task.MLPredictTaskRunner.runPredict(MLPredictTaskRunner.java:476)
» at org.opensearch.ml.task.MLPredictTaskRunner.predict(MLPredictTaskRunner.java:381)
» at org.opensearch.ml.task.MLPredictTaskRunner.lambda$executePredictionByInputDataType$10(MLPredictTaskRunner.java:325)
» at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:916)
» at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
» at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
» at java.base/java.lang.Thread.run(Thread.java:1583)
And for bulk index we also have this issue and will got error:
fatal error in thread [opensearch[integTest-0][opensearch_ml_predict][T#1]], exiting
» java.lang.AssertionError
» at org.opensearch.ingest.IngestService.lambda$executePipelinesInBatchRequests$3(IngestService.java:824)
» at org.opensearch.ingest.IngestService.lambda$innerBatchExecute$13(IngestService.java:1100)
» at org.opensearch.ingest.Pipeline.lambda$batchExecute$2(Pipeline.java:226)
» at org.opensearch.ingest.CompoundProcessor.innerBatchExecute(CompoundProcessor.java:174)
» at org.opensearch.ingest.CompoundProcessor.lambda$innerBatchExecute$3(CompoundProcessor.java:235)
» at org.opensearch.ingest.AbstractBatchingProcessor.lambda$batchExecute$0(AbstractBatchingProcessor.java:69)
» at org.opensearch.neuralsearch.processor.InferenceProcessor.lambda$doSubBatchExecute$3(InferenceProcessor.java:226)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.neuralsearch.ml.MLCommonsClientAccessor.lambda$retryableInferenceSentencesWithVectorResult$4(MLCommonsClientAccessor.java:159)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.ml.client.MachineLearningNodeClient.lambda$getMlPredictionTaskResponseActionListener$5(MachineLearningNodeClient.java:378)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.ml.client.MachineLearningNodeClient.lambda$wrapActionListener$6(MachineLearningNodeClient.java:394)
» at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
» at org.opensearch.action.support.TransportAction$1.onResponse(TransportAction.java:115)
» at org.opensearch.action.support.TransportAction$1.onResponse(TransportAction.java:109)
» at org.opensearch.core.action.ActionListener$6.onResponse(ActionListener.java:301)
» at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
» at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
» at org.opensearch.ml.task.MLPredictTaskRunner.runPredict(MLPredictTaskRunner.java:476)
» at org.opensearch.ml.task.MLPredictTaskRunner.predict(MLPredictTaskRunner.java:381)
» at org.opensearch.ml.task.MLPredictTaskRunner.lambda$executePredictionByInputDataType$10(MLPredictTaskRunner.java:325)
» at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:916)
» at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
» at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
» at java.base/java.lang.Thread.run(Thread.java:1583)
Expected behavior
The node should not be down.
We should replace the while
loop with if
check since we actually rely on the callback function to decide if we should continue to execute the next pipeline.
Additional Details
Plugins
Default plugins + Neural Search Plugin
Screenshots
N/A
Host/Environment (please complete the following information):
- OS: MAC
- Version 15.3.2
Additional context
Add any other context about the problem here.