Skip to content

Commit c8f0b6d

Browse files
authored
fix concurrent modification issue in thread context (#14084)
Signed-off-by: Chenyang Ji <[email protected]>
1 parent 42d6af6 commit c8f0b6d

File tree

3 files changed

+34
-17
lines changed

3 files changed

+34
-17
lines changed

server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -480,28 +480,37 @@ public <T> T getTransient(String key) {
480480
* @param value the header value
481481
*/
482482
public void addResponseHeader(final String key, final String value) {
483-
addResponseHeader(key, value, v -> v);
483+
updateResponseHeader(key, value, v -> v, false);
484484
}
485485

486486
/**
487-
* Remove the {@code value} for the specified {@code key}.
487+
* Update the {@code value} for the specified {@code key}
488488
*
489489
* @param key the header name
490+
* @param value the header value
490491
*/
491-
public void removeResponseHeader(final String key) {
492-
threadLocal.get().responseHeaders.remove(key);
492+
public void updateResponseHeader(final String key, final String value) {
493+
updateResponseHeader(key, value, v -> v, true);
493494
}
494495

495496
/**
496-
* Add the {@code value} for the specified {@code key} with the specified {@code uniqueValue} used for de-duplication. Any duplicate
497+
* Update the {@code value} for the specified {@code key} with the specified {@code uniqueValue} used for de-duplication. Any duplicate
497498
* {@code value} after applying {@code uniqueValue} is ignored.
498499
*
499500
* @param key the header name
500501
* @param value the header value
501502
* @param uniqueValue the function that produces de-duplication values
502-
*/
503-
public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) {
504-
threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize));
503+
* @param replaceExistingKey whether to replace the existing header if it already exists
504+
*/
505+
public void updateResponseHeader(
506+
final String key,
507+
final String value,
508+
final Function<String, String> uniqueValue,
509+
final boolean replaceExistingKey
510+
) {
511+
threadLocal.set(
512+
threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize, replaceExistingKey)
513+
);
505514
}
506515

507516
/**
@@ -726,7 +735,8 @@ private ThreadContextStruct putResponse(
726735
final String value,
727736
final Function<String, String> uniqueValue,
728737
final int maxWarningHeaderCount,
729-
final long maxWarningHeaderSize
738+
final long maxWarningHeaderSize,
739+
final boolean replaceExistingKey
730740
) {
731741
assert value != null;
732742
long newWarningHeaderSize = warningHeadersSize;
@@ -768,8 +778,13 @@ private ThreadContextStruct putResponse(
768778
if (existingValues.contains(uniqueValue.apply(value))) {
769779
return this;
770780
}
771-
// preserve insertion order
772-
final Set<String> newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
781+
Set<String> newValues;
782+
if (replaceExistingKey) {
783+
newValues = Stream.of(value).collect(LINKED_HASH_SET_COLLECTOR);
784+
} else {
785+
// preserve insertion order
786+
newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
787+
}
773788
newResponseHeaders = new HashMap<>(responseHeaders);
774789
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
775790
} else {

server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,10 +322,7 @@ public void writeTaskResourceUsage(SearchShardTask task, String nodeId) {
322322
)
323323
.build();
324324
// Remove the existing TASK_RESOURCE_USAGE header since it would have come from an earlier phase in the same request.
325-
synchronized (this) {
326-
threadPool.getThreadContext().removeResponseHeader(TASK_RESOURCE_USAGE);
327-
threadPool.getThreadContext().addResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString());
328-
}
325+
threadPool.getThreadContext().updateResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString());
329326
} catch (Exception e) {
330327
logger.debug("Error during writing task resource usage: ", e);
331328
}

server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,11 +344,16 @@ public void testResponseHeaders() {
344344
}
345345

346346
final String value = HeaderWarning.formatWarning("qux");
347-
threadContext.addResponseHeader("baz", value, s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false));
347+
threadContext.updateResponseHeader("baz", value, s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false), false);
348348
// pretend that another thread created the same response at a different time
349349
if (randomBoolean()) {
350350
final String duplicateValue = HeaderWarning.formatWarning("qux");
351-
threadContext.addResponseHeader("baz", duplicateValue, s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false));
351+
threadContext.updateResponseHeader(
352+
"baz",
353+
duplicateValue,
354+
s -> HeaderWarning.extractWarningValueFromWarningHeader(s, false),
355+
false
356+
);
352357
}
353358

354359
threadContext.addResponseHeader("Warning", "One is the loneliest number");

0 commit comments

Comments
 (0)