org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG

Photo by Kelly Sikkema on Unsplash

org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG

Kafka Streams application error

Ada Cheng's photo
Ada Cheng
·May 12, 2022·

1 min read

Table of contents

  • Problem
  • Solution

Problem

Kafka Streams application fails and stops with error of "org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG".

ERROR stream-client [fav-color-stream-0c5393be-d0d0-4832-9bf8-2e51c24d7651] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  (org.apache.kafka.streams.KafkaStreams:529) 
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Caused by: org.apache.kafka.common.config.ConfigException: Please specify a key serde or set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
    at org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1449)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:89)
    at org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
    at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:166)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:205)
    at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:97)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:231)
    at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:457)
    at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:880)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:762)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    ... 1 more

Solution

Solution 1 - Properties File

If properties file is used, add the following properties to the properties files.

default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde

Solution 2 - Java

If properties are configured in the Java program, add the following properties to the Properties object.

streamProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
Share this