Existing internal topic has invalid partitions

Kafka Streams application run-time error

·

1 min read

Table of contents

Problem

Kafka Streams application fails to start with error of "Existing internal topic ${changelog-topic-name} has invalid partitions"

ERROR stream-thread [main] Existing internal topic fav-color-stream-Counts-changelog has invalid partitions: expected: 1; actual: 3. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing. (org.apache.kafka.streams.processor.internals.InternalTopicManager:582)
ERROR stream-client [fav-color-stream-5b30e9fc-7641-4366-b60d-313d636b89d7] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.  (org.apache.kafka.streams.KafkaStreams:529)
org.apache.kafka.streams.errors.StreamsException: Existing internal topic fav-color-stream-Counts-changelog has invalid partitions: expected: 1; actual: 3. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:583)
    at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:399)
    at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:594)
    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:406)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:640)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:694)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:112)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:598)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:561)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:980)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:933)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:751)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:604)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576)

Solution

After stopping all application instances, reset the application by executing the following commands:

kafka-streams-application-reset.sh --application-id ${application-id} \
                                   --input-topics ${topic-name}

Replace ${application-id} and ${topic-name} with application ID and topic name respectively.

For example,

kafka-streams-application-reset.sh --application-id fav-color-stream \
                                   --input-topics fav-color-input