Skip to content

Commit c2cb22a

Browse files
andrrossHarsh Kothari
authored andcommitted
Remove deprecated batch_size parameter from _bulk (opensearch-project#17801)
Signed-off-by: Harsh Kothari <[email protected]>
1 parent 9020c8f commit c2cb22a

File tree

8 files changed

+44
-227
lines changed

8 files changed

+44
-227
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6161
### Deprecated
6262

6363
### Removed
64+
- Remove deprecated `batch_size` parameter from `_bulk` ([#14283](https://github.com/opensearch-project/OpenSearch/issues/14283))
6465

6566
### Fixed
6667
- Fix bytes parameter on `_cat/recovery` ([#17598](https://github.com/opensearch-project/OpenSearch/pull/17598))

server/src/internalClusterTest/java/org/opensearch/ingest/IngestClientIT.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,6 @@ public void testSimulate() throws Exception {
162162
}
163163

164164
public void testBulkWithIngestFailures() throws Exception {
165-
runBulkTestWithRandomDocs(false);
166-
}
167-
168-
public void testBulkWithIngestFailuresWithBatchSize() throws Exception {
169-
runBulkTestWithRandomDocs(true);
170-
}
171-
172-
private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Exception {
173165
createIndex("index");
174166

175167
BytesReference source = BytesReference.bytes(
@@ -188,9 +180,6 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
188180

189181
int numRequests = scaledRandomIntBetween(32, 128);
190182
BulkRequest bulkRequest = new BulkRequest();
191-
if (shouldSetBatchSize) {
192-
bulkRequest.batchSize(scaledRandomIntBetween(2, numRequests));
193-
}
194183
for (int i = 0; i < numRequests; i++) {
195184
IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
196185
indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", i % 2 == 0);
@@ -244,7 +233,6 @@ public void testBulkWithIngestFailuresAndDropBatch() throws Exception {
244233
client().admin().cluster().putPipeline(putPipelineRequest).get();
245234

246235
BulkRequest bulkRequest = new BulkRequest();
247-
bulkRequest.batchSize(3);
248236
bulkRequest.add(
249237
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
250238
);

server/src/main/java/org/opensearch/action/bulk/BulkRequest.java

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques
9797
private String globalRouting;
9898
private String globalIndex;
9999
private Boolean globalRequireAlias;
100-
private int batchSize = Integer.MAX_VALUE;
101100

102101
private long sizeInBytes = 0;
103102

@@ -109,8 +108,8 @@ public BulkRequest(StreamInput in) throws IOException {
109108
requests.addAll(in.readList(i -> DocWriteRequest.readDocumentRequest(null, i)));
110109
refreshPolicy = RefreshPolicy.readFrom(in);
111110
timeout = in.readTimeValue();
112-
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
113-
batchSize = in.readInt();
111+
if (in.getVersion().onOrAfter(Version.V_2_14_0) && in.getVersion().before(Version.V_3_0_0)) {
112+
in.readInt(); // formerly batch_size
114113
}
115114
}
116115

@@ -351,27 +350,6 @@ public final BulkRequest timeout(TimeValue timeout) {
351350
return this;
352351
}
353352

354-
/**
355-
* Set batch size
356-
* @param size batch size from input
357-
* @return {@link BulkRequest}
358-
*/
359-
public BulkRequest batchSize(int size) {
360-
if (size < 1) {
361-
throw new IllegalArgumentException("batch_size must be greater than 0");
362-
}
363-
this.batchSize = size;
364-
return this;
365-
}
366-
367-
/**
368-
* Get batch size
369-
* @return batch size
370-
*/
371-
public int batchSize() {
372-
return this.batchSize;
373-
}
374-
375353
/**
376354
* Note for internal callers (NOT high level rest client),
377355
* the global parameter setting is ignored when used with:
@@ -479,8 +457,8 @@ public void writeTo(StreamOutput out) throws IOException {
479457
out.writeCollection(requests, DocWriteRequest::writeDocumentRequest);
480458
refreshPolicy.writeTo(out);
481459
out.writeTimeValue(timeout);
482-
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
483-
out.writeInt(batchSize);
460+
if (out.getVersion().onOrAfter(Version.V_2_14_0) && out.getVersion().before(Version.V_3_0_0)) {
461+
out.writeInt(Integer.MAX_VALUE); // formerly batch_size
484462
}
485463
}
486464

server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -963,8 +963,7 @@ public boolean isForceExecution() {
963963
}
964964
},
965965
bulkRequestModifier::markItemAsDropped,
966-
executorName,
967-
original
966+
executorName
968967
);
969968
}
970969

server/src/main/java/org/opensearch/ingest/IngestService.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.opensearch.OpenSearchParseException;
4141
import org.opensearch.ResourceNotFoundException;
4242
import org.opensearch.action.DocWriteRequest;
43-
import org.opensearch.action.bulk.BulkRequest;
4443
import org.opensearch.action.bulk.TransportBulkAction;
4544
import org.opensearch.action.index.IndexRequest;
4645
import org.opensearch.action.ingest.DeletePipelineRequest;
@@ -567,8 +566,7 @@ public void executeBulkRequest(
567566
BiConsumer<Integer, Exception> onFailure,
568567
BiConsumer<Thread, Exception> onCompletion,
569568
IntConsumer onDropped,
570-
String executorName,
571-
BulkRequest originalBulkRequest
569+
String executorName
572570
) {
573571
threadPool.executor(executorName).execute(new AbstractRunnable() {
574572

@@ -579,7 +577,7 @@ public void onFailure(Exception e) {
579577

580578
@Override
581579
protected void doRun() {
582-
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped, originalBulkRequest);
580+
runBulkRequestInBatch(numberOfActionRequests, actionRequests, onFailure, onCompletion, onDropped);
583581
}
584582
});
585583
}
@@ -589,8 +587,7 @@ private void runBulkRequestInBatch(
589587
Iterable<DocWriteRequest<?>> actionRequests,
590588
BiConsumer<Integer, Exception> onFailure,
591589
BiConsumer<Thread, Exception> onCompletion,
592-
IntConsumer onDropped,
593-
BulkRequest originalBulkRequest
590+
IntConsumer onDropped
594591
) {
595592

596593
final Thread originalThread = Thread.currentThread();
@@ -635,7 +632,7 @@ private void runBulkRequestInBatch(
635632
i++;
636633
}
637634

638-
int batchSize = Math.min(numberOfActionRequests, originalBulkRequest.batchSize());
635+
int batchSize = numberOfActionRequests;
639636
List<List<IndexRequestWrapper>> batches = prepareBatches(batchSize, indexRequestWrappers);
640637
logger.debug("batchSize: {}, batches: {}", batchSize, batches.size());
641638

server/src/main/java/org/opensearch/rest/action/document/RestBulkAction.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.opensearch.action.bulk.BulkRequest;
3737
import org.opensearch.action.bulk.BulkShardRequest;
3838
import org.opensearch.action.support.ActiveShardCount;
39-
import org.opensearch.common.logging.DeprecationLogger;
4039
import org.opensearch.common.settings.Settings;
4140
import org.opensearch.rest.BaseRestHandler;
4241
import org.opensearch.rest.RestRequest;
@@ -67,8 +66,6 @@
6766
public class RestBulkAction extends BaseRestHandler {
6867

6968
private final boolean allowExplicitIndex;
70-
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestBulkAction.class);
71-
static final String BATCH_SIZE_DEPRECATED_MESSAGE = "The batch size option in bulk API is deprecated and will be removed in 3.0.";
7269

7370
public RestBulkAction(Settings settings) {
7471
this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
@@ -100,10 +97,6 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
10097
Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
10198
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
10299
bulkRequest.setRefreshPolicy(request.param("refresh"));
103-
if (request.hasParam("batch_size")) {
104-
deprecationLogger.deprecate("batch_size_deprecation", BATCH_SIZE_DEPRECATED_MESSAGE);
105-
}
106-
bulkRequest.batchSize(request.paramAsInt("batch_size", Integer.MAX_VALUE));
107100
bulkRequest.add(
108101
request.requiredContent(),
109102
defaultIndex,

server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -346,8 +346,7 @@ public void testIngestLocal() throws Exception {
346346
failureHandler.capture(),
347347
completionHandler.capture(),
348348
any(),
349-
eq(Names.WRITE),
350-
eq(bulkRequest)
349+
eq(Names.WRITE)
351350
);
352351
completionHandler.getValue().accept(null, exception);
353352
assertTrue(failureCalled.get());
@@ -384,8 +383,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception {
384383
failureHandler.capture(),
385384
completionHandler.capture(),
386385
any(),
387-
eq(Names.WRITE),
388-
any()
386+
eq(Names.WRITE)
389387
);
390388
completionHandler.getValue().accept(null, exception);
391389
assertTrue(failureCalled.get());
@@ -431,8 +429,7 @@ public void testIngestSystemLocal() throws Exception {
431429
failureHandler.capture(),
432430
completionHandler.capture(),
433431
any(),
434-
eq(Names.SYSTEM_WRITE),
435-
eq(bulkRequest)
432+
eq(Names.SYSTEM_WRITE)
436433
);
437434
completionHandler.getValue().accept(null, exception);
438435
assertTrue(failureCalled.get());
@@ -463,7 +460,7 @@ public void testIngestForward() throws Exception {
463460
action.execute(null, bulkRequest, listener);
464461

465462
// should not have executed ingest locally
466-
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any());
463+
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
467464
// but instead should have sent to a remote node with the transport service
468465
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
469466
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@@ -503,7 +500,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception {
503500
singleItemBulkWriteAction.execute(null, indexRequest, listener);
504501

505502
// should not have executed ingest locally
506-
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any(), any());
503+
verify(ingestService, never()).executeBulkRequest(anyInt(), any(), any(), any(), any(), any());
507504
// but instead should have sent to a remote node with the transport service
508505
ArgumentCaptor<DiscoveryNode> node = ArgumentCaptor.forClass(DiscoveryNode.class);
509506
verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture());
@@ -589,8 +586,7 @@ private void validatePipelineWithBulkUpsert(@Nullable String indexRequestIndexNa
589586
failureHandler.capture(),
590587
completionHandler.capture(),
591588
any(),
592-
eq(Names.WRITE),
593-
eq(bulkRequest)
589+
eq(Names.WRITE)
594590
);
595591
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
596592
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
@@ -633,8 +629,7 @@ public void testDoExecuteCalledTwiceCorrectly() throws Exception {
633629
failureHandler.capture(),
634630
completionHandler.capture(),
635631
any(),
636-
eq(Names.WRITE),
637-
any()
632+
eq(Names.WRITE)
638633
);
639634
completionHandler.getValue().accept(null, exception);
640635
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
@@ -721,8 +716,7 @@ public void testFindDefaultPipelineFromTemplateMatch() {
721716
failureHandler.capture(),
722717
completionHandler.capture(),
723718
any(),
724-
eq(Names.WRITE),
725-
any()
719+
eq(Names.WRITE)
726720
);
727721
}
728722

@@ -761,8 +755,7 @@ public void testFindDefaultPipelineFromV2TemplateMatch() {
761755
failureHandler.capture(),
762756
completionHandler.capture(),
763757
any(),
764-
eq(Names.WRITE),
765-
any()
758+
eq(Names.WRITE)
766759
);
767760
}
768761

@@ -787,8 +780,7 @@ private void validateDefaultPipeline(IndexRequest indexRequest) {
787780
failureHandler.capture(),
788781
completionHandler.capture(),
789782
any(),
790-
eq(Names.WRITE),
791-
any()
783+
eq(Names.WRITE)
792784
);
793785
assertEquals(indexRequest.getPipeline(), "default_pipeline");
794786
completionHandler.getValue().accept(null, exception);

0 commit comments

Comments
 (0)