Apache Kafka is an open-source distributed event streaming platform for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Introduction
This article aims to provide basic hands-on instructions on how to interact with Kafka in console.
Installation
Extract Kafka (this document is based on Kafka version 2.13-3.6.1).
tar -xzf kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.1
Start Kafka
There are two ways to start a Kafka.
Start Kafka with ZooKeeper
To start a Kafka cluster using ZooKeeper, open a Terminal and follow the commands below.
export KAFKA_HOME=<Directory of your Kafka distribution>
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka with Kraft
To start a Kafka cluster using KRaft, open a Terminal and follow the commands below.
Format the storage directory:
export KAFKA_HOME=<Directory of your Kafka distribution> export KAFKA_CLUSTER_ID=`$KAFKA_HOME/bin/kafka-storage.sh random-uuid` $KAFKA_HOME/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c $KAFKA_HOME/config/kraft/server.properties
Start Kafka using KRaft.
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
To terminate, input
CTRL+C
.
Kafka CLI
This section shows the common Kafka CLI commands for Kafra operations.
Get the cluster metadata
Open another Terminal. Set environment variables.
export KAFKA_HOME=<Directory of your Kafka distribution> export BOOTSTRAP_SERVER=localhost:9092
List cluster ID. kafka-metadata-quorum.sh is used to describe the runtime state of the cluster metadata partition, including cluster ID.
$KAFKA_HOME/bin/kafka-metadata-quorum.sh --bootstrap-server $BOOTSTRAP_SERVER describe --status
Here below lists an example of the output:
ClusterId: STyWMJVBQQyaSt7WEgs5nQ LeaderId: 1 LeaderEpoch: 1 HighWatermark: 739 MaxFollowerLag: 0 MaxFollowerLagTimeMs: 0 CurrentVoters: [1] CurrentObservers: []
List all topics
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --list
Create a Topic
export TOPIC_NAME=test-topic
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --replication-factor 1 --partitions 1 --topic $TOPIC_NAME
Start Kafka Console Producer
The following command starts a Kafka producer, using comma as the separator between the key and value. Note that when a topic
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $BOOTSTRAP_SERVER \
--topic $TOPIC_NAME \
--property parse.key=true \
--property key.separator=,
A >
prompt should be shown in the next line. Input a pair of key and value text separated by a comma, e.g. Hello,World
.
To terminate, input CTRL+C
.
Start Kafka Console Consumer
Open another Terminal. Set environment variables.
export KAFKA_HOME=<Directory of your Kafka distribution> export BOOTSTRAP_SERVER=localhost:9092 export TOPIC_NAME=test-topic
Start a Kafka Console Consumer.
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER \ --topic $TOPIC_NAME \ --from-beginning \ --property print.key=true \ --property print.value=true
The console should show the messages in the topic from the beginning.
To terminate, input
CTRL+C
.
Kafka Messages with header
Apache Kafka 0.11 introduced the concept of record headers.
Start Kafka Console Producer
By default, the header delimiter, which separates the headers and key/value pair, is "\t" and the key separator, which separate the key and the value, is also "\t".
The following Kafka Producer Console commands start the producer with the header delimiter and the key separator changed to semicolon and comma respectively.
export KAFKA_HOME=<Directory of your Kafka distribution>
export BOOTSTRAP_SERVER=localhost:9092
export TOPIC_NAME=test-headers-topic
$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVER \
--topic $TOPIC_NAME \
--property parse.headers=true \
--property parse.key=true \
--property key.separator=, \
--property headers.delimiter=\;
Start Kafka Console Consumer
Open another Terminal. Start a Kafka Console Consumer, which consumes the using following commands.
export KAFKA_HOME=<Directory of your Kafka distribution>
export BOOTSTRAP_SERVER=localhost:9092
export TOPIC_NAME=test-headers-topic
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER \
--topic $TOPIC_NAME \
--from-beginning \
--property print.headers=true \
--property print.key=true \
--property print.value=true
Publish test messages with headers
Send the following test message from the Kafka Console Producer.
h0-k1:h0-v1,h0-k2:h0-v2;key 0,value 0
h1-k1:h1-v1,h1-k2:h1-v2;key 1,value 1
h2-k1:h2-v1,h2-k2:h2-v2;key 2,value 2
Receive test messages with headers
Test messages with headers should be received from the Kafka Console Consumer.