Open
Description
When I consume KTable, it is materialized as KeyValueStore automatically.
fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
input.join(table) { value1, value2 ->
Pair(value1, value2)
}.peek { key, value -> println("$key: $value") }
}
If I set topology.optimization to all then, changelog topic is not created but reuse consuming topic as changelog topic.
Because Kafka Streams join semantics, I want to change the type of state store to VersionedKeyValueStore.
If I materialize state store manually like this, it creates changelog topic although I set topology.optimization to all.
fun process() = BiConsumer<KStream<String, String>, KTable<String, String>> { input, table ->
val storedTable = table
.toStream()
.groupByKey()
.aggregate(
{ byteArrayOf() },
{ _, value, _ -> value.toByteArray() },
Materialized.`as`(store)
)
val store = Stores.persistentVersionedKeyValueStore("kafka-streams-test-store", Duration.ofDays(1))
val stream = input.join(storedTable) { value1, value2 ->
Pair(value1, value2)
}.peek { key, value -> println("$key, ${value?.first}, ${value?.second?.decodeToString()") }
}
Is there any way to change the type of state store with not creating changelog topic via optimization?