Skip to content

[RFC] ML Inference Processors  #2173

Open
@mingshl

Description

@mingshl

Problem statement

Currently, there are different implementations of search processors/ingest processors that use a machine learning model, e.g, TextEmbeddingProcessor for text-embedding models , GenerativeQAResponseProcessor for large language models, PersonalizeRankingResponseProcessor for reranking models housing in AWS personalized Service. Looking forward, when each type of machine learning models has a separate type of processor, the number of processors will grow to be enormous. It will be in-convenient for users when configuring different processors. However, ML Commons plugin supports connecting to a foundation model hosted on an external platform and uploading your own pre-trained model to the OpenSearch cluster, users can utilize the model_id from ml-commons plugins to apply in search/ingest process. We can simplified the multiple implementations of search processors/ingest processors that use a machine learning model.

Motivation:

To Improve the ease of using machine learning models to process ingest/search requests, we are introducing a Machine Learning Inference Processor to OpenSearch-ml-common to uses a machine learning model to read from the data and add the prediction outcome to the data that is being ingested through the ingest pipeline, or return the prediction outcomes along with the search response that is returned through the search pipeline.

Scope:

  • Developers can use models connected/uploaded in ml-commons to generate inference and add to ingest document/search response
  • Developers can use the ML Inference processors to handle input mappings from fields in ingest documents/search hits to model input, and handle output mapping from model inference output to new fields in ingest documents/search hits.
  • Developers can apply different Inference configurations(e.g, embedding_type, knn_size) to ML Inference processors to fit different use cases.
  • Developers can use the ML Inference processors to call a model id to conduct multiple inferences processes for different set of input and output. e.g, creating embedding for two fields.

Out of Scope:

-ML inference Processor is focusing on model inferences and does not handle data transformations. Developers would consider data formatting methods before using ML Inference Processors if the documents and search hit does not match the format of model input. For example, adding the preprocess methods to model connectors (example), utilizing data transforming processing (e.g Split Processors, JsonProcessor ).

Proposed Design:

Create ML Inference processors(ingest side), ML Inference search requests processors, ML Inference search response processors that share the same parameters and extend from same interface that handles getModelInferenceResult.

ML Inference Processors parameters:

parameters Required Default Description
model_id yes - (String) The ID for the model
function_name Optional for externally hosted models, Required for local models remote (String) The function name of the ML model configured in the processor. For local models, valid values are sparse_encoding, sparse_tokenize, text_embedding, and text_similarity. For externally hosted models, valid value is remote.
model_input Optional for externally hosted models, Required for local models (String) A template that defines the input field format expected by the model. Each local model type might use a different set of inputs. For externally hosted models, default is "{ "parameters": ${ml_inference.parameters} }
input_map Optional for externally hosted models, Required for local models (List of Map) maps the fields from documents to model input, if no input mapping specified, default to use all fields from documents as model input
output_map Optional for externally hosted models, Required for local models (List of Map) maps the fields from model out to ingest documents, if no output mapping specified, will return all model outputs in a ‘inference_result' field
inference_parameters no The default settings defined in the model (Object) flexible configurations needed for different model predictions can be added in model_config. For example response_filter.
full_response_path Optional for externally hosted models, Required for local models true for local models and false for externally hosted models (Boolean) Set this parameter to true if the model_output_field contains a full JSON path to the field instead of the field name. The model output will then be fully parsed to get the value of the field.
override no false (Boolean) Relevant if an ingested document already contains a field with the name specified in <new_document_field>. If override is false, then the input field is skipped. If true, then the existing field value is overridden by the new model output.
ignore_missing no false (Boolean) If true and any of the input fields defined in input_map are missing then those missing fields are quietly ignored, otherwise a missing field causes a failure.
description no - Description of the processor. Useful for describing the purpose of the processor or its configuration.
ignore_failure no false Ignore failures for the processor.
tag no - Identifier for the processor. Useful for debugging and metrics.

Sample Process:

using the following example for a text embedding remote model wupL7Y0Bm1mYgYg_PasK that is connected in ml-common,

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "wupL7Y0Bm1mYgYg_PasK",
        "input_map": [
          {
            "dairy": "input"
          }
        ],
        
        "output_map": [
          {
            "response": "dairy_embedding"
          }
        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"     
                                                                                                                                                                                                                                      
curl -XPUT localhost:9200/daily_index -d ' {
  "settings": {
    "index": {
      "default_pipeline": "test-ingest"
    }
  },
  "mappings": {
    "dynamic": false,
    "properties": {
      "id": {
        "type": "integer"
      },
      "dairy": {
        "type": "text"
      },
      "weather": {
        "type": "text", 
        "fields": {
          "standard": { "type": "text" },  
          "raw": { "type": "keyword" }     
        }
      }
    }
  }
}' -H "Content-Type:Application/json"        
curl -XPUT localhost:9200/daily_index/_doc/1 -d '{
  "id": 1,
  "dairy": ["happy"],
  "weather": "rainy"
  }' -H "Content-Type:Application/json"  


curl -XGET localhost:9200/daily_index/_doc/1 

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          {
  {
  "_index": "daily_index",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy": ["happy"],
    "weather": "rainy",
    "dairy_embedding": [
      -0.052491702,
      0.041711915,
      0.08673346,
      0.0020010993,
      -0.0081961695,
      -0.10907775,
      0.10094219,
      -0.07203556,
      0.037287816
    ]
  }
}

Added after gathering feedbacks for different use cases,

0. using multiple rounds of predictions

Sometimes, a model only accept one model input fields, and we would like to predicts on multiple fields, we need to run the model multiple times. The inference processors can run one model with multiple inference.

curl -XPUT localhost:9200/_ingest/pipeline/test-ingest -d '{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "S7Uk_I0Bgdza-v2klZ72",
        "input_map": [
          {
            "dairy": "input"
          },
          { 
          "weather": "input"
          }
        ],
        
        "output_map": [
          {
            "response": "dairy_embedding"
          },
                    {
            "response": "weather_embedding"
          }
          
        ]
      }
    }
  ]
}' -H "Content-Type:Application/json"                                                                                                                                                                                                                                           

in this setting, it will run the model twice and mapping the output accordingly to two document fields.

the sample response would be

   curl -XGET localhost:9200/daily_index/_doc/1
   
   {
  "_index": "daily_index",
  "_id": "1",
  "_version": 2,
  "_seq_no": 1,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "dairy_embedding": [
      [
        -0.083478354,
        0.05323705,
        -0.005245433
      ]
    ],
    "weather_embedding": [
      [
        0.017304314,
        -0.021530833,
        0.050184276,
        0.08962978
      ],
      [
        -0.049097132,
        -0.032323096,
        0.014746797,
        -0.06299502,
        0.05543841
      ]
    ],
    "weather": [
      "rainy",
      "cloudy"
    ],
    "id": 1,
    "dairy": [
      "happy"
    ]
  }
} 

Handling object type model input:

for example, I want to use a language classification model , this model's predict function is expecting an object, in the format of {"input": ["text"]}, this can be a good complicated case, it's object with a map of list.

model.predict(
{
  "inputs": [
    "opensearch introduce ml inference processor"
  ]
}
)

1. Using the one field as ml input

then in the ml connector, if the use case is using the one field from the document to identify we can define the connectors and in the response body

##connector_1
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 "description": "Connector for classification model",
 "version": 1,
 "protocol": "aws_sigv4",
 "parameters": {
 "region": "us-east-1",
 "service_name": "sagemaker"
 },
 "credential": {
 "access_key": "your_access_key",
 "secret_key": "your_secret_key",
 "session_token": "your_session_token"},
 "actions": [
 {
 "action_type": "predict",
 "method": "POST",
 "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/your_url/invocations",
 "headers": { 
 "content-type": "application/json" 
 },
 "request_body": "{\"inputs\":[\"${parameters.inputs}\"]}" } ]
}

in this request body, it helps formatting the parameters.input field into the desire model input format. In using this model to predict in ml common, we don't need to worry about the format of an object of a map with list, instead, we use parameters.input as we defined in the connectors.

POST /_plugins/_ml/models/3NDCHI4Bwy4GdbSIgXcY/_predict
{
  "parameters": {
    "inputs": "Say this is a test"
  }
}

##returning 
{
  "inference_results": [
    {
      "output": [
        {
          "name": "response",
          "dataAsMap": {
            "response": [
              {
                "label": "en",
                "score": 0.9411176443099976
              }
            ]
          }
        }
      ],
      "status_code": 200
    }
  ]
}

Let's use the inference processors during ingestions for reviews field:

PUT /_ingest/pipeline/test-ingest-language-one
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "you_model_id",
        "input_map": [
          {
            "inputs": "reviews"
          }
        ],
        
        "output_map": [
          {
            "reviews_language_classification": "response"
          }
        ]
      }
    }
  ]
} 

## binding the ingest pipeline with the index: 

PUT /product_review_index_1
 {
  "settings": {
    "index": {
      "default_pipeline": "test-ingest-language-one"
    }
  },
  "mappings": {
    "dynamic": true,
    "properties": {
      "id": {
        "type": "integer"
      },
      "reviews": {
        "type": "text"
      },
      "products": {
        "type": "text", 
        "fields": {
          "standard": { "type": "text" },  
          "raw": { "type": "keyword" }     
        }
      }
    }
  }
}

##during ingestion, it auto triggers to the ingest pipeline with ml_inference processors
PUT /product_review_index_1/_doc/1
{
  "id": 1,
  "reviews": "happy purchase, love it!",
  "products": "opensearch hoodies"
  } 

Now the documents getting ingest already has the model output field named reviews_language_classification

{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

Since I turned on dynamic mapping, to use the language label field, we can search using doc path reviews_language_classification.label in search queries .

GET _search
{
  "query": {
    "match": {
      "reviews_language_classification.label": "en"
    }
  }
}

2. Using the multiple fields as ml input

Another use case, @zhichao-aws also mentioned that many models now accepts multiple model input fields, nowadays, text_embedding models and classification models accepts multiple model inputs, we just need to config the connectors properly to meet this multiple input fields requirements. Similarly in the response body, we config two input fields.

##connector_2
POST /_plugins/_ml/connectors/_create
{
 "name": "Sagemaker language identification model connector",
 ...same as above .. 
 "request_body": "{\"inputs\":[\"${parameters.inputs1}\",\"${parameters.inputs2}\"]}" } ]
}

In this case, it's looking for two input fields to the document, and format properly, the ml_inference processor will handle the mappings for reviews -> input1, products -> input2.

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "19BlHI4Bwy4GdbSI43dk",
        "input_map": [
          {
            "inputs1": "reviews",
            "inputs2": "products"
          }
        ],
        
        "output_map": [
          {
            "reviews_products_language_classification": "response"
          }
        ]
      }
    }
  ]
}

###returnning 
{
  "_index": "product_review_index_1",
  "_id": "1",
  "_version": 1,
  "_seq_no": 0,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, love it!",
    "reviews_products_language_classification": [
      {
        "score": 0.9937509894371033,
        "label": "en"
      },
   {
        "score": 0.9063221654224541,
        "label": "en"
      }
    ],
    "id": 1,
    "products": "opensearch hoodies"
  }
}

3. Formatting with other processors.

in the connectors, it's supported writing post_process_function, and also we can use other processors before ml_inference_processor to handle model input field format or after to handle model output format.

I am continuing the for the second step, and would like parse the model output field reviews_products_language_classification into two fields. this is to seperate an array and append to new fields, we can use a script processor and remove processor to bundle with it. In the future, we can also add a new type of processor maybe called "seperate_append" processor then it would be easier to use.

let's modify the ingest pipeline for the same index

PUT /_ingest/pipeline/test-ingest-language-two
{
  "description": "test ml model ingest processor",
  "processors": [
    {
      "ml_inference": {
        "model_id": "79CAH44Bwy4GdbSIoXeq",
        "input_map": [
          {
            "inputs1": "reviews",
            "inputs2": "products"
          }
        ],
        
        "output_map": [
          {
            "reviews_products_language_classification": "response"
          }
        ]
      }
    },
    {
     "script": {
          "source": """
          def headers = ["reviews_language", "products_language"];
          for (int i = 0; i < ctx.reviews_products_language_classification.length; i++) {
            ctx[headers[i]] = ctx.reviews_products_language_classification[i];
          }
        """
     }
    },
    {
      "remove": {
        "field": "reviews_products_language_classification"  
      }
    }
  ]
} 

then when ingesting the same document, it returns

{
  "_index": "product_index_3",
  "_id": "1",
  "_version": 2,
  "_seq_no": 3,
  "_primary_term": 1,
  "found": true,
  "_source": {
    "reviews": "happy purchase, highly recommended",
    "products_language": {
      "score": 0.6440341472625732,
      "label": "en"
    },
    "reviews_language": {
      "score": 0.9933781027793884,
      "label": "en"
    },
    "id": 1,
    "products": "opensearch hoodies"
  }
}

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

In Progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions