Currently MessagingMessageListenerAdapter#sendResponse(Object result, String topic, @Nullable Object source) method is not setting key before sending and causes CorruptRecordException as stated here.
Workaround is to write a ProducerInterceptor and add it to producer config like;
extended.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, List.of(KeySetterProducerInterceptor.class));
Planning to support this?