Photo by Kelly Sikkema on Unsplash
org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
Kafka Streams application error
Problem
Kafka Streams application fails and stops with error of "org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG".
ERROR stream-client [fav-color-stream-0c5393be-d0d0-4832-9bf8-2e51c24d7651] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. (org.apache.kafka.streams.KafkaStreams:529)
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1449)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:89)
at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:166)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:205)
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:231)
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:457)
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:880)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:762)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
... 1 more
Root Cause
Default serialization serdes for kay and value are not set.
Solution
Solution 1 - Properties File
If properties file is used, add the following properties to the properties files.
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
Solution 2 - Java
If properties are configured in the Java program, add the following properties to the Properties object.
streamProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());