Skip to content

Commit 26d5cf6

Browse files
committed
Adds preset contentRegistry for IngestProcessors (opensearch-project#3281)
* add preset xContentRegistry to ingestProcessors for custom parametized local models Curently local models that use the parameters map within the payload to create a request can not create objects to be used for local model prediction. This requires a opensearch core change because it needs the contentRegistry,however given there is not much dependency on the registry (currently) we can give it the preset registry given in the MachineLearningPlugin class vai the getNamedXContent() class Signed-off-by: Brian Flores <[email protected]> * Adds UT for proving models depend on xContentRegistry for prediction Signed-off-by: Brian Flores <[email protected]> * apply spotless Signed-off-by: Brian Flores <[email protected]> * Adds IT for Asymmetric Embedding scenario with MLInferenceIngestProcessor We needed to make sure that a IT existed so that the preset content registry on the processor could work with parametized local models. By providing an IT that uses the asymetric embedding model its proven that the content registry is needed to create the embeddings. In this specific test case I used a ingest pipeline to convert passage embeddings, by simulating the pipeline to save test time. Signed-off-by: Brian Flores <[email protected]> --------- Signed-off-by: Brian Flores <[email protected]> (cherry picked from commit df1b1ef)
1 parent 92824bb commit 26d5cf6

File tree

4 files changed

+182
-2
lines changed

4 files changed

+182
-2
lines changed

plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,10 +1063,12 @@ public void loadExtensions(ExtensionLoader loader) {
10631063
@Override
10641064
public Map<String, org.opensearch.ingest.Processor.Factory> getProcessors(org.opensearch.ingest.Processor.Parameters parameters) {
10651065
Map<String, org.opensearch.ingest.Processor.Factory> processors = new HashMap<>();
1066+
NamedXContentRegistry contentRegistry = new NamedXContentRegistry(getNamedXContent());
1067+
10661068
processors
10671069
.put(
10681070
MLInferenceIngestProcessor.TYPE,
1069-
new MLInferenceIngestProcessor.Factory(parameters.scriptService, parameters.client, xContentRegistry)
1071+
new MLInferenceIngestProcessor.Factory(parameters.scriptService, parameters.client, contentRegistry)
10701072
);
10711073
return Collections.unmodifiableMap(processors);
10721074
}

plugin/src/test/java/org/opensearch/ml/processor/MLInferenceIngestProcessorTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.opensearch.core.action.ActionListener;
3232
import org.opensearch.core.xcontent.NamedXContentRegistry;
3333
import org.opensearch.ingest.IngestDocument;
34+
import org.opensearch.ml.common.FunctionName;
3435
import org.opensearch.ml.common.dataset.remote.RemoteInferenceInputDataSet;
3536
import org.opensearch.ml.common.input.MLInput;
3637
import org.opensearch.ml.common.output.model.MLResultDataType;
@@ -138,6 +139,60 @@ public void testExecute_Exception() throws Exception {
138139

139140
}
140141

142+
/**
143+
* Models that use the parameters field need to have a valid NamedXContentRegistry object to create valid MLInputs. For example
144+
* <pre>
145+
* PUT /_plugins/_ml/_predict/text_embedding/model_id
146+
* {
147+
* "parameters": {
148+
* "content_type" : "query"
149+
* },
150+
* "text_docs" : ["what day is it today?"],
151+
* "target_response" : ["sentence_embedding"]
152+
* }
153+
* </pre>
154+
* These types of models like Local Asymmetric embedding models use the parameters field.
155+
* And as such we need to test that having the contentRegistry throws an exception as it can not
156+
* properly create a valid MLInput to perform prediction
157+
*
158+
* @implNote If you check the stack trace of the test you will see it tells you that it's a direct consequence of xContentRegistry being null
159+
*/
160+
public void testExecute_xContentRegistryNullWithLocalModel_throwsException() throws Exception {
161+
// Set the registry to null and reset after exiting the test
162+
xContentRegistry = null;
163+
164+
String localModelInput =
165+
"{ \"text_docs\": [\"What day is it today?\"],\"target_response\": [\"sentence_embedding\"], \"parameters\": { \"contentType\" : \"query\"} }";
166+
167+
MLInferenceIngestProcessor processor = createMLInferenceProcessor(
168+
"local_model_id",
169+
null,
170+
null,
171+
null,
172+
false,
173+
FunctionName.TEXT_EMBEDDING.toString(),
174+
false,
175+
false,
176+
false,
177+
localModelInput
178+
);
179+
try {
180+
String npeMessage =
181+
"Cannot invoke \"org.opensearch.ml.common.input.MLInput.setAlgorithm(org.opensearch.ml.common.FunctionName)\" because \"mlInput\" is null";
182+
183+
processor.execute(ingestDocument, handler);
184+
verify(handler)
185+
.accept(
186+
isNull(),
187+
argThat(exception -> exception instanceof NullPointerException && exception.getMessage().equals(npeMessage))
188+
);
189+
} catch (Exception e) {
190+
assertEquals("this catch block should not get executed.", e.getMessage());
191+
}
192+
// reset to mocked object
193+
xContentRegistry = mock(NamedXContentRegistry.class);
194+
}
195+
141196
/**
142197
* test nested object document with array of Map<String,String>
143198
*/

plugin/src/test/java/org/opensearch/ml/rest/RestMLInferenceIngestProcessorIT.java

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.ml.rest;
77

88
import static org.opensearch.ml.common.MLTask.MODEL_ID_FIELD;
9+
import static org.opensearch.ml.utils.TestData.SENTENCE_TRANSFORMER_MODEL_HASH_VALUE;
910
import static org.opensearch.ml.utils.TestData.SENTENCE_TRANSFORMER_MODEL_URL;
1011
import static org.opensearch.ml.utils.TestHelper.makeRequest;
1112

@@ -25,6 +26,8 @@
2526
import org.opensearch.ml.common.transport.register.MLRegisterModelInput;
2627
import org.opensearch.ml.utils.TestHelper;
2728

29+
import com.google.common.collect.ImmutableList;
30+
import com.jayway.jsonpath.DocumentContext;
2831
import com.jayway.jsonpath.JsonPath;
2932

3033
public class RestMLInferenceIngestProcessorIT extends MLCommonsRestTestCase {
@@ -431,6 +434,110 @@ public void testMLInferenceProcessorLocalModelObjectField() throws Exception {
431434
Assert.assertEquals(0.49191704, (Double) embedding2.get(0), 0.005);
432435
}
433436

437+
public void testMLInferenceIngestProcessor_simulatesIngestPipelineSuccessfully_withAsymmetricEmbeddingModelUsingPassageContentType()
438+
throws Exception {
439+
String taskId = registerModel(TestHelper.toJsonString(registerAsymmetricEmbeddingModelInput()));
440+
waitForTask(taskId, MLTaskState.COMPLETED);
441+
getTask(client(), taskId, response -> {
442+
assertNotNull(response.get(MODEL_ID_FIELD));
443+
this.localModelId = (String) response.get(MODEL_ID_FIELD);
444+
try {
445+
String deployTaskID = deployModel(this.localModelId);
446+
waitForTask(deployTaskID, MLTaskState.COMPLETED);
447+
448+
getModel(client(), this.localModelId, model -> { assertEquals("DEPLOYED", model.get("model_state")); });
449+
} catch (IOException | InterruptedException e) {
450+
throw new RuntimeException(e);
451+
}
452+
});
453+
454+
String asymmetricPipelineName = "asymmetric_embedding_pipeline";
455+
String createPipelineRequestBody = "{\n"
456+
+ " \"description\": \"ingest PASSAGE text and generate a embedding using an asymmetric model\",\n"
457+
+ " \"processors\": [\n"
458+
+ " {\n"
459+
+ " \"ml_inference\": {\n"
460+
+ "\n"
461+
+ " \"model_input\": \"{\\\"text_docs\\\":[\\\"${input_map.text_docs}\\\"],\\\"target_response\\\":[\\\"sentence_embedding\\\"],\\\"parameters\\\":{\\\"content_type\\\":\\\"passage\\\"}}\",\n"
462+
+ " \"function_name\": \"text_embedding\",\n"
463+
+ " \"model_id\": \""
464+
+ this.localModelId
465+
+ "\",\n"
466+
+ " \"input_map\": [\n"
467+
+ " {\n"
468+
+ " \"text_docs\": \"description\"\n"
469+
+ " }\n"
470+
+ " ],\n"
471+
+ " \"output_map\": [\n"
472+
+ " {\n"
473+
+ "\n"
474+
+ " "
475+
+ " \"fact_embedding\": \"$.inference_results[0].output[0].data\"\n"
476+
+ " }\n"
477+
+ " ]\n"
478+
+ " }\n"
479+
+ " }\n"
480+
+ " ]\n"
481+
+ "}";
482+
483+
createPipelineProcessor(createPipelineRequestBody, asymmetricPipelineName);
484+
String sampleDocuments = "{\n"
485+
+ " \"docs\": [\n"
486+
+ " {\n"
487+
+ " \"_index\": \"my-index\",\n"
488+
+ " \"_id\": \"1\",\n"
489+
+ " \"_source\": {\n"
490+
+ " \"title\": \"Central Park\",\n"
491+
+ " \"description\": \"A large public park in the heart of New York City, offering a wide range of recreational activities.\"\n"
492+
+ " }\n"
493+
+ " },\n"
494+
+ " {\n"
495+
+ " \"_index\": \"my-index\",\n"
496+
+ " \"_id\": \"2\",\n"
497+
+ " \"_source\": {\n"
498+
+ " \"title\": \"Empire State Building\",\n"
499+
+ " \"description\": \"An iconic skyscraper in New York City offering breathtaking views from its observation deck.\"\n"
500+
+ " }\n"
501+
+ " }\n"
502+
+ " ]\n"
503+
+ "}\n";
504+
505+
Map simulateResponseDocuments = simulateIngestPipeline(asymmetricPipelineName, sampleDocuments);
506+
507+
DocumentContext documents = JsonPath.parse(simulateResponseDocuments);
508+
509+
List centralParkFactEmbedding = documents.read("docs.[0].*._source.fact_embedding.*");
510+
assertEquals(768, centralParkFactEmbedding.size());
511+
Assert.assertEquals(0.5137818, (Double) centralParkFactEmbedding.get(0), 0.005);
512+
513+
List empireStateBuildingFactEmbedding = documents.read("docs.[1].*._source.fact_embedding.*");
514+
assertEquals(768, empireStateBuildingFactEmbedding.size());
515+
Assert.assertEquals(0.4493039, (Double) empireStateBuildingFactEmbedding.get(0), 0.005);
516+
}
517+
518+
private MLRegisterModelInput registerAsymmetricEmbeddingModelInput() {
519+
MLModelConfig modelConfig = TextEmbeddingModelConfig
520+
.builder()
521+
.modelType("bert")
522+
.frameworkType(TextEmbeddingModelConfig.FrameworkType.SENTENCE_TRANSFORMERS)
523+
.embeddingDimension(768)
524+
.queryPrefix("query >>")
525+
.passagePrefix("passage >> ")
526+
.build();
527+
528+
return MLRegisterModelInput
529+
.builder()
530+
.modelName("test_model_name")
531+
.version("1.0.0")
532+
.functionName(FunctionName.TEXT_EMBEDDING)
533+
.modelFormat(MLModelFormat.TORCH_SCRIPT)
534+
.modelConfig(modelConfig)
535+
.url(SENTENCE_TRANSFORMER_MODEL_URL)
536+
.deployModel(false)
537+
.hashValue(SENTENCE_TRANSFORMER_MODEL_HASH_VALUE)
538+
.build();
539+
}
540+
434541
// TODO: add tests for other local model types such as sparse/cross encoders
435542
public void testMLInferenceProcessorLocalModelNestedField() throws Exception {
436543

@@ -550,6 +657,21 @@ protected void createPipelineProcessor(String requestBody, final String pipeline
550657

551658
}
552659

660+
protected Map simulateIngestPipeline(String pipelineName, String sampleDocuments) throws IOException {
661+
Response ingestionResponse = TestHelper
662+
.makeRequest(
663+
client(),
664+
"POST",
665+
"/_ingest/pipeline/" + pipelineName + "/_simulate",
666+
null,
667+
sampleDocuments,
668+
null
669+
);
670+
assertEquals(200, ingestionResponse.getStatusLine().getStatusCode());
671+
672+
return parseResponseToMap(ingestionResponse);
673+
}
674+
553675
protected void createIndex(String indexName, String requestBody) throws Exception {
554676
Response response = makeRequest(client(), "PUT", indexName, null, requestBody, null);
555677
assertEquals(200, response.getStatusLine().getStatusCode());
@@ -585,7 +707,7 @@ protected MLRegisterModelInput registerModelInput() throws IOException, Interrup
585707
.modelConfig(modelConfig)
586708
.url(SENTENCE_TRANSFORMER_MODEL_URL)
587709
.deployModel(false)
588-
.hashValue("e13b74006290a9d0f58c1376f9629d4ebc05a0f9385f40db837452b167ae9021")
710+
.hashValue(SENTENCE_TRANSFORMER_MODEL_HASH_VALUE)
589711
.build();
590712
}
591713

plugin/src/test/java/org/opensearch/ml/utils/TestData.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class TestData {
3030
"https://github.com/opensearch-project/ml-commons/blob/2.x/ml-algorithms/src/test/resources/org/opensearch/ml/engine/algorithms/text_embedding/all-MiniLM-L6-v2_torchscript_huggingface.zip?raw=true";
3131
public static final String SENTENCE_TRANSFORMER_MODEL_URL =
3232
"https://github.com/opensearch-project/ml-commons/blob/2.x/ml-algorithms/src/test/resources/org/opensearch/ml/engine/algorithms/text_embedding/traced_small_model.zip?raw=true";
33+
public static final String SENTENCE_TRANSFORMER_MODEL_HASH_VALUE = "e13b74006290a9d0f58c1376f9629d4ebc05a0f9385f40db837452b167ae9021";
3334
public static final String TIME_FIELD = "timestamp";
3435
public static final String HUGGINGFACE_TRANSFORMER_MODEL_HASH_VALUE =
3536
"e13b74006290a9d0f58c1376f9629d4ebc05a0f9385f40db837452b167ae9021";

0 commit comments

Comments
 (0)