Photo by Towfiqu barbhuiya on Unsplash
Existing internal topic has invalid partitions
Kafka Streams application run-time error
Problem
Kafka Streams application fails to start with error of "Existing internal topic ${changelog-topic-name} has invalid partitions"
ERROR stream-thread [main] Existing internal topic fav-color-stream-Counts-changelog has invalid partitions: expected: 1; actual: 3. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing. (org.apache.kafka.streams.processor.internals.InternalTopicManager:582)
ERROR stream-client [fav-color-stream-5b30e9fc-7641-4366-b60d-313d636b89d7] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. (org.apache.kafka.streams.KafkaStreams:529)
org.apache.kafka.streams.errors.StreamsException: Existing internal topic fav-color-stream-Counts-changelog has invalid partitions: expected: 1; actual: 3. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:583)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:399)
at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:594)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:406)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:640)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:694)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:112)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:598)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:561)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:980)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:933)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)
Solution
After stopping all application instances, reset the application by executing the following commands:
kafka-streams-application-reset.sh --application-id ${application-id} \
--input-topics ${topic-name}
Replace ${application-id} and ${topic-name} with application ID and topic name respectively.
For example,
kafka-streams-application-reset.sh --application-id fav-color-stream \
--input-topics fav-color-input