Skip to content

[BUG] Ingest Service doesn't handle target index change case correctly #17819

Closed
@bzhangam

Description

@bzhangam

Describe the bug

Ingest pipeline can be used to conditionally change the target index for the index request. And once we run into that case we will try to re-resolve the pipelines for the request based on the new target index. But there are couple issues:

  1. We simply assume all the requests should be re-routed to the same new target index which is not true.
  2. We execute the final pipeline of the new target index immediately but later after we switch back to the write thread we will try to execute the final pipeline again if it exists.
  3. We cannot execute the default pipeline of the new target index.
  4. For bulk update request we are not able to change the target index because we don't use the ingest pipeline to modify the original docWriteRequest.

Related component

Ingest Service

To Reproduce

  1. Create couple ingest pipelines:

This will be the default pipeline of the log-index and it will conditionally route the request to the metric-index based on the type field. And it will also count how many times it has been executed.

PUT /_ingest/pipeline/conditional-target-index
{
  "processors": [
    {
      "set": {
        "field": "_index",
        "value": "metric-index",
        "if": "ctx.type == 'metric'"
      },
	"script": {
		"lang": "painless",
"source": "ctx.log_index_default_pipeline_executions = ctx.log_index_default_pipeline_executions != null ? ctx.log_index_default_pipeline_executions + 1 : 1;"
	}
    }
  ]
}

This will be the final pipeline of the log-index and it will auto add a field to the doc and it will also count how many times it has been executed.

PUT _ingest/pipeline/auto-field-for-log
{
  "processors": [
    {
      "set": {
        "field": "auto_field_for_log",
        "value": "auto field added by final pipeline for log doc."
      },
			"script": {
				"lang": "painless",
    "source": "ctx.log_index_final_pipeline_executions = ctx.log_index_final_pipeline_executions != null ? ctx.log_index_final_pipeline_executions + 1 : 1;"
			}
    }
  ]
}

This will be the default pipeline of the metric-index.

PUT _ingest/pipeline/metric-default-pipeline
{
  "processors": [
    {
      "set": {
        "field": "auto_field_from_metric_default_pipeline",
        "value": "auto field added by default pipeline for metric index."
      },

    }
  ]
}

This will be the final pipeline of the metric-index and it will auto add a field to the doc and it will also count how many times it has been executed.

PUT /_ingest/pipeline/auto-field-for-metric

{
  "processors": [
    {
      "set": {
        "field": "auto_field_for_metric",
        "value": "auto field added by final pipeline for metric doc."
      },
"script": {
	"lang": "painless",
"source": "ctx.metric_index_final_pipeline_executions = ctx.metric_index_final_pipeline_executions != null ? ctx.metric_index_final_pipeline_executions + 1 : 1;"
}
    }
  ]
}
  1. Create index with ingest pipelines

Create the log-index with conditional-target-index as the default pipeline and auto-field-for-log as the final pipeline.

PUT log-index
{
  "mappings": {
    "properties": {
      "type":{
				"type":"text"
			},
			"value":{
				"type":"text"
			}
    }
  },
	"settings":{
		"index.default_pipeline": "conditional-target-index",
		"index.final_pipeline": "auto-field-for-log"
	}
}

Create the metric-index with metric-default-pipeline as the default pipeline and auto-field-for-metric as the final pipeline.

PUT metric-index
{
  "mappings": {
    "properties": {
      "type":{
				"type":"text"
			},
			"value":{
				"type":"text"
			}
    }
  },
	"settings":{
		"index.default_pipeline":"metric-default-pipeline",
		"index.final_pipeline": "auto-field-for-metric"
	}
}

  1. Index doc

Bulk index multiple docs.

PUT _bulk
{ "index": { "_index": "log-index", "_id": "1" } }
{ "type": "log", "value":"This is a log doc." }
{ "index": { "_index": "log-index", "_id": "2" } }
{ "type": "log", "value":"This is a log doc."  }
{ "index": { "_index": "log-index", "_id": "3" } }
{ "type": "metric", "value":"This is a metric doc."  }

GET /metric-index/_search
{
  "query": {
		"match_all":{}
	}
}


"hits": [
			{
				"_index": "metric-index",
				"_id": "3",
				"_score": 1,
				"_source": {
					"log_index_default_pipeline_executions": 1,
					"auto_field_for_log": "auto field added by final pipeline for log doc.",
					"log_index_final_pipeline_executions": 1,
					"type": "metric",
					"value": "This is a metric doc."
				}
			}
		]

In this request metric doc will be routed to the metric-index but it will not execute the default pipeline and final pipeline of the metric-index. It will incorrectly apply the final pipeline of the log-index to it. This happens because in this code we simply assume the index of the first request should be the same index for other following requests in the batch. In this case the first doc is a log doc so we will think there is no target index change.

PUT _bulk
{ "index": { "_index": "log-index", "_id": "4" } }
{ "type": "metric", "value":"This is a log doc." }
{ "index": { "_index": "log-index", "_id": "5" } }
{ "type": "log", "value":"This is a log doc."  }
{ "index": { "_index": "log-index", "_id": "6" } }
{ "type": "metric", "value":"This is a metric doc."  }

GET /metric-index/_search
{
  "query": {
		"match_all":{}
	}
}

hits: [
...,
{
			"_index": "metric-index",
			"_id": "4",
			"_score": 1,
			"_source": {
				"log_index_default_pipeline_executions": 1,
				"auto_field_for_metric": "auto field added by final pipeline for metric doc.",
				"type": "metric",
				"value": "This is a log doc.",
				"metric_index_final_pipeline_executions": 2
			}
		},
		{
			"_index": "metric-index",
			"_id": "6",
			"_score": 1,
			"_source": {
				"log_index_default_pipeline_executions": 1,
				"auto_field_for_metric": "auto field added by final pipeline for metric doc.",
				"type": "metric",
				"value": "This is a metric doc.",
				"metric_index_final_pipeline_executions": 2
			}
		}
]


GET /log-index/_search
{
  "query": {
		"match_all":{}
	}
}

hits:[
...,
{
	"_index": "log-index",
	"_id": "5",
	"_score": 1,
	"_source": {
		"log_index_default_pipeline_executions": 1,
		"auto_field_for_metric": "auto field added by final pipeline for metric doc.",
		"auto_field_for_log": "auto field added by final pipeline for log doc.",
		"log_index_final_pipeline_executions": 1,
		"type": "log",
		"value": "This is a log doc.",
		"metric_index_final_pipeline_executions": 1
	}
}
]

In this example we correctly apply the final pipeline of the metric-index to metric docs but incorrectly also apply it to the log doc due the bug 1.

Besides we also execute the final pipeline twice because we don't reset the pipeline as _none in the request and when we switch back to the write thread we will try to execute them again.

And if you look at the metric doc you will find the default pipeline is not executed. This happens because when we try to re-resolve the request the request pipeline is already set as _none and we will think this is the user provided parameter which is not right code.

PUT _bulk
{ "update": { "_index": "log-index", "_id": "7" } }
{ "doc": { "type": "metric", "value":"This is a metric doc." }, "doc_as_upsert": true }
{ "update": { "_index": "log-index", "_id": "8" } }
{ "doc": { "type": "log", "value":"This is a log doc." }, "doc_as_upsert": true }
{ "update": { "_index": "log-index", "_id": "9" } }
{ "doc": { "type": "metric", "value":"This is a metric doc." }, "doc_as_upsert": true }
GET /log-index/_search
{
  "query": {
		"match_all":{}
	}
}

hits:[
...

				"_index": "log-index",
				"_id": "7",
				"_score": 1,
				"_source": {
					"log_index_default_pipeline_executions": 1,
					"auto_field_for_metric": "auto field added by final pipeline for metric doc.",
					"type": "metric",
					"value": "This is a metric doc.",
					"metric_index_final_pipeline_executions": 2
				}
			},
			{
				"_index": "log-index",
				"_id": "8",
				"_score": 1,
				"_source": {
					"log_index_default_pipeline_executions": 1,
					"auto_field_for_metric": "auto field added by final pipeline for metric doc.",
					"auto_field_for_log": "auto field added by final pipeline for log doc.",
					"log_index_final_pipeline_executions": 1,
					"type": "log",
					"value": "This is a log doc.",
					"metric_index_final_pipeline_executions": 1
				}
			},
			{
				"_index": "log-index",
				"_id": "9",
				"_score": 1,
				"_source": {
					"log_index_default_pipeline_executions": 1,
					"auto_field_for_metric": "auto field added by final pipeline for metric doc.",
					"type": "metric",
					"value": "This is a metric doc.",
					"metric_index_final_pipeline_executions": 2
				}
			}
]

The metric docs are not routed to the metric-index because for bulk update request we are not modifying the original request. code

Expected Behavior

For bug 1 and 2 we should simply set the request as pipeline not resolved and rely on TransportBulkAction to re-process them. In this way we can handle the requests with different new target indices properly and avoid duplicated execution of the final pipeline. And to address bug 3 we also need to reset the pipeline of the request as null otherwise it will be _none and will be treated like the ingest pipeline defined as the request parameter based on this code.

For bug 4 we either can call out it' a limitation or we should figure out a way to modify the target index of the original docWriteRequest.

Additional Details

Plugins
default config

Screenshots
N/A

Host/Environment (please complete the following information):

  • OS: MAC
  • Version 15.3.1

Additional context
N/A

Metadata

Metadata

Assignees

No one assigned

    Labels

    IndexingIndexing, Bulk Indexing and anything related to indexingbugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions