Kafka Basics

·

3 min read

Apache Kafka is an open-source distributed event streaming platform for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Introduction

This article aims to provide basic hands-on instructions on how to interact with Kafka in console.

Installation

  1. Download the binary distribution of Apache Kafka.

  2. Extract Kafka (this document is based on Kafka version 2.13-3.6.1).

     tar -xzf kafka_2.13-3.6.1.tgz
     cd kafka_2.13-3.6.1
    

Start Kafka

There are two ways to start a Kafka.

Start Kafka with ZooKeeper

To start a Kafka cluster using ZooKeeper, open a Terminal and follow the commands below.

export KAFKA_HOME=<Directory of your Kafka distribution>
$KAFKA_HOME/bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka with Kraft

To start a Kafka cluster using KRaft, open a Terminal and follow the commands below.

  1. Format the storage directory:

     export KAFKA_HOME=<Directory of your Kafka distribution>
     export KAFKA_CLUSTER_ID=`$KAFKA_HOME/bin/kafka-storage.sh random-uuid`
     $KAFKA_HOME/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c $KAFKA_HOME/config/kraft/server.properties
    
  2. Start Kafka using KRaft.

     $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/kraft/server.properties
    
  3. To terminate, input CTRL+C.

Kafka CLI

This section shows the common Kafka CLI commands for Kafra operations.

Get the cluster metadata

  1. Open another Terminal. Set environment variables.

     export KAFKA_HOME=<Directory of your Kafka distribution>
     export BOOTSTRAP_SERVER=localhost:9092
    
  2. List cluster ID. kafka-metadata-quorum.sh is used to describe the runtime state of the cluster metadata partition, including cluster ID.

     $KAFKA_HOME/bin/kafka-metadata-quorum.sh --bootstrap-server $BOOTSTRAP_SERVER describe --status
    

    Here below lists an example of the output:

     ClusterId:              STyWMJVBQQyaSt7WEgs5nQ
     LeaderId:               1
     LeaderEpoch:            1
     HighWatermark:          739
     MaxFollowerLag:         0
     MaxFollowerLagTimeMs:   0
     CurrentVoters:          [1]
     CurrentObservers:       []
    

List all topics

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --list

Create a Topic

export TOPIC_NAME=test-topic
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server $BOOTSTRAP_SERVER --replication-factor 1 --partitions 1 --topic $TOPIC_NAME

Start Kafka Console Producer

The following command starts a Kafka producer, using comma as the separator between the key and value. Note that when a topic

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $BOOTSTRAP_SERVER \
    --topic $TOPIC_NAME \
    --property parse.key=true \
    --property key.separator=,

A > prompt should be shown in the next line. Input a pair of key and value text separated by a comma, e.g. Hello,World.

To terminate, input CTRL+C.

Start Kafka Console Consumer

  1. Open another Terminal. Set environment variables.

     export KAFKA_HOME=<Directory of your Kafka distribution>
     export BOOTSTRAP_SERVER=localhost:9092
     export TOPIC_NAME=test-topic
    
  2. Start a Kafka Console Consumer.

     $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER \
         --topic $TOPIC_NAME \
         --from-beginning \
         --property print.key=true \
         --property print.value=true
    
  3. The console should show the messages in the topic from the beginning.

  4. To terminate, input CTRL+C.

Kafka Messages with header

Apache Kafka 0.11 introduced the concept of record headers.

Start Kafka Console Producer

By default, the header delimiter, which separates the headers and key/value pair, is "\t" and the key separator, which separate the key and the value, is also "\t".

The following Kafka Producer Console commands start the producer with the header delimiter and the key separator changed to semicolon and comma respectively.

export KAFKA_HOME=<Directory of your Kafka distribution>
export BOOTSTRAP_SERVER=localhost:9092
export TOPIC_NAME=test-headers-topic

$KAFKA_HOME/bin/kafka-console-producer.sh --bootstrap-server $BOOTSTRAP_SERVER \
--topic $TOPIC_NAME \
--property parse.headers=true \
--property parse.key=true \
--property key.separator=, \
--property headers.delimiter=\;

Start Kafka Console Consumer

Open another Terminal. Start a Kafka Console Consumer, which consumes the using following commands.

export KAFKA_HOME=<Directory of your Kafka distribution>
export BOOTSTRAP_SERVER=localhost:9092
export TOPIC_NAME=test-headers-topic

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $BOOTSTRAP_SERVER \
--topic $TOPIC_NAME \
--from-beginning \
--property print.headers=true \
--property print.key=true \
--property print.value=true

Publish test messages with headers

Send the following test message from the Kafka Console Producer.

h0-k1:h0-v1,h0-k2:h0-v2;key 0,value 0
h1-k1:h1-v1,h1-k2:h1-v2;key 1,value 1
h2-k1:h2-v1,h2-k2:h2-v2;key 2,value 2

Receive test messages with headers

Test messages with headers should be received from the Kafka Console Consumer.