Skip to content

Commit 8fc3f92

Browse files
committed
improve serialization for resource usage info
Signed-off-by: Chenyang Ji <[email protected]>
1 parent f105e4e commit 8fc3f92

File tree

2 files changed

+50
-16
lines changed

2 files changed

+50
-16
lines changed

libs/core/src/main/java/org/opensearch/core/tasks/resourcetracker/TaskResourceInfo.java

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@
1010

1111
import org.opensearch.common.annotation.PublicApi;
1212
import org.opensearch.core.ParseField;
13-
import org.opensearch.core.common.Strings;
1413
import org.opensearch.core.common.io.stream.StreamInput;
1514
import org.opensearch.core.common.io.stream.StreamOutput;
1615
import org.opensearch.core.common.io.stream.Writeable;
1716
import org.opensearch.core.xcontent.ConstructingObjectParser;
18-
import org.opensearch.core.xcontent.MediaTypeRegistry;
1917
import org.opensearch.core.xcontent.ToXContentObject;
2018
import org.opensearch.core.xcontent.XContentBuilder;
2119

20+
import java.io.ByteArrayOutputStream;
2221
import java.io.IOException;
22+
import java.util.Base64;
2323
import java.util.Objects;
2424

2525
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
@@ -202,7 +202,46 @@ public void writeTo(StreamOutput out) throws IOException {
202202

203203
@Override
204204
public String toString() {
205-
return Strings.toString(MediaTypeRegistry.JSON, this);
205+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
206+
try (StreamOutput streamOutput = new StreamOutput() {
207+
@Override
208+
public void writeByte(byte b) {
209+
byteArrayOutputStream.write(b);
210+
}
211+
212+
@Override
213+
public void writeBytes(byte[] b, int offset, int length) {
214+
byteArrayOutputStream.write(b, offset, length);
215+
}
216+
217+
/**
218+
* Forces any buffered output to be written.
219+
*/
220+
@Override
221+
public void flush() throws IOException {
222+
byteArrayOutputStream.flush();
223+
}
224+
225+
/**
226+
* Closes this stream to further operations.
227+
*/
228+
@Override
229+
public void close() throws IOException {
230+
byteArrayOutputStream.close();
231+
}
232+
233+
@Override
234+
public void reset() {
235+
byteArrayOutputStream.reset();
236+
}
237+
}) {
238+
// Serialize the object to the custom StreamOutput
239+
this.writeTo(streamOutput);
240+
// Convert the byte array to Base64 string
241+
return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray());
242+
} catch (IOException e) {
243+
return "";
244+
}
206245
}
207246

208247
@Override

server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,21 @@
2323
import org.opensearch.common.util.concurrent.ConcurrentCollections;
2424
import org.opensearch.common.util.concurrent.ConcurrentMapLong;
2525
import org.opensearch.common.util.concurrent.ThreadContext;
26-
import org.opensearch.common.xcontent.XContentHelper;
27-
import org.opensearch.core.common.bytes.BytesArray;
26+
import org.opensearch.core.common.io.stream.StreamInput;
2827
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
2928
import org.opensearch.core.tasks.resourcetracker.ResourceStatsType;
3029
import org.opensearch.core.tasks.resourcetracker.ResourceUsageInfo;
3130
import org.opensearch.core.tasks.resourcetracker.ResourceUsageMetric;
3231
import org.opensearch.core.tasks.resourcetracker.TaskResourceInfo;
3332
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
3433
import org.opensearch.core.tasks.resourcetracker.ThreadResourceInfo;
35-
import org.opensearch.core.xcontent.DeprecationHandler;
36-
import org.opensearch.core.xcontent.MediaTypeRegistry;
37-
import org.opensearch.core.xcontent.NamedXContentRegistry;
38-
import org.opensearch.core.xcontent.XContentParser;
3934
import org.opensearch.threadpool.RunnableTaskExecutionListener;
4035
import org.opensearch.threadpool.ThreadPool;
4136

4237
import java.io.IOException;
4338
import java.lang.management.ManagementFactory;
4439
import java.util.ArrayList;
40+
import java.util.Base64;
4541
import java.util.Collections;
4642
import java.util.List;
4743
import java.util.Map;
@@ -346,13 +342,12 @@ public TaskResourceInfo getTaskResourceUsageFromThreadContext() {
346342
String usage = taskResourceUsages.get(0);
347343
try {
348344
if (usage != null && !usage.isEmpty()) {
349-
XContentParser parser = XContentHelper.createParser(
350-
NamedXContentRegistry.EMPTY,
351-
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
352-
new BytesArray(usage),
353-
MediaTypeRegistry.JSON
354-
);
355-
return TaskResourceInfo.PARSER.apply(parser, null);
345+
// Get the serialized data as a byte array
346+
byte[] serializedData = Base64.getDecoder().decode(usage);
347+
// Deserialize from byte array
348+
try (StreamInput streamInput = StreamInput.wrap(serializedData)) {
349+
return TaskResourceInfo.readFromStream(streamInput);
350+
}
356351
}
357352
} catch (IOException e) {
358353
logger.debug("fail to parse phase resource usages: ", e);

0 commit comments

Comments
 (0)