Skip to content

Commit 7133033

Browse files
committed
fix ut
Signed-off-by: Yupeng Fu <[email protected]>
1 parent 969684e commit 7133033

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

plugins/ingestion-kinesis/src/main/java/org/opensearch/plugin/kinesis/KinesisShardConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,8 @@ private synchronized List<ReadResult<SequenceNumber, KinesisMessage>> fetch(
234234

235235
for (Record record : records) {
236236
SequenceNumber sequenceNumber1 = new SequenceNumber(record.sequenceNumber());
237-
KinesisMessage message = new KinesisMessage(record.data().asByteArray(), record.approximateArrivalTimestamp().toEpochMilli());
237+
Long timestamp = record.approximateArrivalTimestamp() != null ? record.approximateArrivalTimestamp().toEpochMilli() : null;
238+
KinesisMessage message = new KinesisMessage(record.data().asByteArray(), timestamp);
238239
results.add(new ReadResult<>(sequenceNumber1, message));
239240
}
240241

0 commit comments

Comments
 (0)