Expected Behavior
Acknowledging an index in a batch should ideally have constant time, but be no worse than linear.
Current Behavior
Acknowledging an index in a batch has quadratic time N(N+1)/2 ~ N^2
Context
Batch consumers operate on a LinkedList of records. If the consumer uses MANUAL_IMMEDIATE ack mode, and the listener invokes acknowledgement.acknowledge(index) where index is relatively big (eg. when processing batches of 100k), performance takes hit because of the linear lookup records.get(i) in a loop here
Maybe the most robust solution would be to offer clients the ability to customise how the record list is created? eg.
eg.
ConsumerFactory#setRecordListFactory(Function<ConsumerRecords<K, V>,List<ConsumerRecord<K, V>> factory);
This way we could pass in other implementations to replace createRecordList
eg.
private List<ConsumerRecord<K, V>> createRecordList(final ConsumerRecords<K, V> records) {
Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
ConsumerRecord<K, V>[] recordsArray = (ConsumerRecord<K, V>[]) Array.newInstance(ConsumerRecord.class, records.count());
int index = 0;
while (iterator.hasNext()) {
recordsArray[index] = iterator.next();
index += 1;
}
return List.of(recordsArray);
}
I also tried to directly replace the existing createRecordList with the above implementation; however, it caused one test to fail (SubBatchPerPartitionTests > withFilter()) because List.of produces an immutable list. List.of could be replaced with Arrays.asList (and yield an ArrayList), but then filtering would be less efficient (.. i presume, but haven't verified).
Expected Behavior
Acknowledging an index in a batch should ideally have constant time, but be no worse than linear.
Current Behavior
Acknowledging an index in a batch has quadratic time
N(N+1)/2~N^2Context
Batch consumers operate on a
LinkedListof records. If the consumer usesMANUAL_IMMEDIATEack mode, and the listener invokesacknowledgement.acknowledge(index)whereindexis relatively big (eg. when processing batches of 100k), performance takes hit because of the linear lookuprecords.get(i)in a loop hereMaybe the most robust solution would be to offer clients the ability to customise how the record list is created? eg.
eg.
This way we could pass in other implementations to replace
createRecordListeg.
I also tried to directly replace the existing
createRecordListwith the above implementation; however, it caused one test to fail (SubBatchPerPartitionTests > withFilter()) becauseList.ofproduces an immutable list.List.ofcould be replaced withArrays.asList(and yield anArrayList), but then filtering would be less efficient (.. i presume, but haven't verified).