Skip to content

Remove deprecated batch_size parameter from _bulk #17801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Deprecated

### Removed
- Remove deprecated `batch_size` parameter from `_bulk` ([#14283](https://github.com/opensearch-project/OpenSearch/issues/14283))

### Fixed
- Fix bytes parameter on `_cat/recovery` ([#17598](https://github.com/opensearch-project/OpenSearch/pull/17598))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,6 @@ public void testSimulate() throws Exception {
}

public void testBulkWithIngestFailures() throws Exception {
runBulkTestWithRandomDocs(false);
}

public void testBulkWithIngestFailuresWithBatchSize() throws Exception {
runBulkTestWithRandomDocs(true);
}

private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Exception {
createIndex("index");

BytesReference source = BytesReference.bytes(
Expand All @@ -188,9 +180,6 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except

int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
if (shouldSetBatchSize) {
bulkRequest.batchSize(scaledRandomIntBetween(2, numRequests));
}
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0);
Expand Down Expand Up @@ -244,7 +233,6 @@ public void testBulkWithIngestFailuresAndDropBatch() throws Exception {
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.batchSize(3);
bulkRequest.add(
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@
private String globalRouting;
private String globalIndex;
private Boolean globalRequireAlias;
private int batchSize = Integer.MAX_VALUE;

private long sizeInBytes = 0;

Expand All @@ -109,8 +108,8 @@
requests.addAll(in.readList(i -> DocWriteRequest.readDocumentRequest(null, i)));
refreshPolicy = RefreshPolicy.readFrom(in);
timeout = in.readTimeValue();
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
batchSize = in.readInt();
if (in.getVersion().onOrAfter(Version.V_2_14_0) && in.getVersion().before(Version.V_3_0_0)) {
in.readInt(); // formerly batch_size

Check warning on line 112 in server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/bulk/BulkRequest.java#L112

Added line #L112 was not covered by tests
}
}

Expand Down Expand Up @@ -351,27 +350,6 @@
return this;
}

/**
* Set batch size
* @param size batch size from input
* @return {@link BulkRequest}
*/
public BulkRequest batchSize(int size) {
if (size < 1) {
throw new IllegalArgumentException("batch_size must be greater than 0");
}
this.batchSize = size;
return this;
}

/**
* Get batch size
* @return batch size
*/
public int batchSize() {
return this.batchSize;
}

/**
* Note for internal callers (NOT high level rest client),
* the global parameter setting is ignored when used with:
Expand Down Expand Up @@ -479,8 +457,8 @@
out.writeCollection(requests, DocWriteRequest::writeDocumentRequest);
refreshPolicy.writeTo(out);
out.writeTimeValue(timeout);
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeInt(batchSize);
if (out.getVersion().onOrAfter(Version.V_2_14_0) && out.getVersion().before(Version.V_3_0_0)) {
out.writeInt(Integer.MAX_VALUE); // formerly batch_size

Check warning on line 461 in server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/bulk/BulkRequest.java#L461

Added line #L461 was not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,7 @@ public boolean isForceExecution() {
}
},
bulkRequestModifier::markItemAsDropped,
executorName,
original
executorName
);
}

Expand Down
11 changes: 4 additions & 7 deletions server/src/main/java/org/opensearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.OpenSearchParseException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.TransportBulkAction;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.ingest.DeletePipelineRequest;
Expand Down Expand Up @@ -567,8 +566,7 @@ public void executeBulkRequest(
BiConsumer<Integer, Exception> onFailure,
BiConsumer<Thread, Exception> onCompletion,
IntConsumer onDropped,
String executorName,
BulkRequest originalBulkRequest
String executorName
) {
threadPool.executor(executorName).execute(new AbstractRunnable() {

Expand All @@ -579,7 +577,7 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped);
}
});
}
Expand All @@ -589,8 +587,7 @@ private void runBulkRequestInBatch(
Iterable<DocWriteRequest<?>> actionRequests,
BiConsumer<Integer, Exception> onFailure,
BiConsumer<Thread, Exception> onCompletion,
IntConsumer onDropped,
BulkRequest originalBulkRequest
IntConsumer onDropped
) {

final Thread originalThread = Thread.currentThread();
Expand Down Expand Up @@ -635,7 +632,7 @@ private void runBulkRequestInBatch(
i++;
}

int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize());
int batchSize = numberOfActionRequests;
List<List<IndexRequestWrapper>> batches = prepareBatches(batchSize, indexRequestWrappers);
logger.debug("batchSize: {}, batches: {}", batchSize, batches.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
Expand Down Expand Up @@ -67,8 +66,6 @@
public class RestBulkAction extends BaseRestHandler {

private final boolean allowExplicitIndex;
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class);
static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0.";

public RestBulkAction(Settings settings) {
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
Expand Down Expand Up @@ -100,10 +97,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
if (request.hasParam("batch_size")) {
deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE);
}
bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,7 @@ public void testIngestLocal() throws Exception {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
eq(bulkRequest)
eq(Names.WRITE)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
Expand Down Expand Up @@ -384,8 +383,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
any()
eq(Names.WRITE)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
Expand Down Expand Up @@ -431,8 +429,7 @@ public void testIngestSystemLocal() throws Exception {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.SYSTEM_WRITE),
eq(bulkRequest)
eq(Names.SYSTEM_WRITE)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
Expand Down Expand Up @@ -463,7 +460,7 @@ public void testIngestForward() throws Exception {
action.execute(null, bulkRequest, listener);

// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any());
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
Expand Down Expand Up @@ -503,7 +500,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
singleItemBulkWriteAction.execute(null, indexRequest, listener);

// should not have executed ingest locally
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any());
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
// but instead should have sent to a remote node with the transport service
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
Expand Down Expand Up @@ -589,8 +586,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
eq(bulkRequest)
eq(Names.WRITE)
);
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
Expand Down Expand Up @@ -633,8 +629,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
any()
eq(Names.WRITE)
);
completionHandler.getValue().accept(null, exception);
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
Expand Down Expand Up @@ -721,8 +716,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
any()
eq(Names.WRITE)
);
}

Expand Down Expand Up @@ -761,8 +755,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
any()
eq(Names.WRITE)
);
}

Expand All @@ -787,8 +780,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
failureHandler.capture(),
completionHandler.capture(),
any(),
eq(Names.WRITE),
any()
eq(Names.WRITE)
);
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(null, exception);
Expand Down
Loading
Loading