Recently I was trying to study Kafka, but I didn’t find a single resource that would give me a quick introduction and hands-on experience with it and Clojure. So, I’m making my own here! Don’t expect a “too deep introduction” – this is just the quick-and-dirty introduction about the concepts, and then I’ll show some code examples in Clojure

Kafka is a messaging system similar to RabbitMQ and SQS. The great differential is that it’s faster than both solutions, and works very well in cluster mode. Installing Kafka locally is quite complicated so you probably will wants to use the docker-compose.yaml file below:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

This file will create the Kafka broker (like a single node of the messaging) and will add Zookeeper (that will allow you to coordinate between different Kafkas, decide which node is the leader, and also participate on the node election when the leader goes down, and other things). You will connect into 9092 port, and then listen and send messages from there.

Instead of using queues like RabbitMQ or SQS, Kafka uses topics. Each topic have a number of partitions, and you can increase the number of then (but never decrease it). The partition serves for multiple purposes: the first is to distribute the message (or the group of messages) into different Kafka nodes; and the other is to control how to send messages between different consumers listening on the same topic, in the same consumer group.

The rules are quite simple: if you have a system and want to receive messages from a topic, you will “subscribe” to the topic. If you have, for example, two instances of the same system listening to the same topic but only one partition, only one instance will consume the messages: the other will be idle. Similarly, if you have two instances of the same system listening to a single topic, and 3 partitions, one of the instances will listen to two partitions and the other to only one.

To show how it works in practice in Clojure I’m using the Jackdaw library, and this is how the code looks like with subscribers to a single topic (in this case, the topic is named “greetings”):

(ns kafka-test.core
  (:require [jackdaw.client :as jack-client]
            [jackdaw.client.log :as log])
  (:gen-class))

(def consumer-config
  ; Where we will connect...
  {"bootstrap.servers" "localhost:9092"
  ; The "consumer group" that we're listening....
   "group.id"  "kafka-test.consumer-group-1"
  ; How do we deserialize keys?
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
  ; How do we deserialize values?
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})

(defn consume! []
  (with-open [consumer (-> (jack-client/consumer consumer-config)
                           (jack-client/subscribe [{:topic-name "greetings"}]))]
    (doseq [{:keys [key value partition timestamp offset]} (log/log consumer 100)]
      (println "key: " key)
      (println "value: " value)
      (println "partition: " partition)
      (println "timestamp: " timestamp)
      (println "offset: " offset))))

(defn -main [& args]
  (consume!))

If you save it and run with lein run, you’ll see that it is able to connect to Kafka and, if you send some messages to that topic, it’ll consume then. The key will be nil, and the partition will always be 0. That’s normal (for now).

You can send messages to specific partitions of a specific topic, or you can just send a message and let Kafka do round-robin – the first message will be sent to the first partition, the second to the second, and so on until you exhaust the number of the partitions – and then the next message will go to the first partition again, and so on. Or, you can send a “key” with the message and then Kafka will do some kind of algorithm (this algorithm is also customizable) to, based on the key, choose which partition that message will go. This allows you to be sure that different messages, but from the same context (let’s say a user ID or a specific system ID) will go always to the same partition on that topic.

To send messages, you can do the following code:

(def producer-config
  {"bootstrap.servers" "localhost:9092"
   "acks" "all"
   "client.id" "foo"
   "key.serializer" "org.apache.kafka.common.serialization.StringSerializer"
   "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"})

(with-open [producer (jack-client/producer producer-config)]
  (jack-client/produce! producer {:topic-name "greetings"} "Hello")
  (jack-client/produce! producer {:topic-name "greetings"} "Good morning")
  (jack-client/produce! producer {:topic-name "greetings"} "Good afternoon")
  (jack-client/produce! producer {:topic-name "greetings"} "Good night"))

Please notice that I open a connection to the producer, and then send the messages. This is to avoid the delay of connection, so in a real world example you probably want to open a connection and keep it open to avoid delays. Again, this producer will just send messages to a single partition (because, until now, our Kafka topic only have one!), so let’s change this too:

; We need a new require
(require '[jackdaw.admin :as admin])

; We need the Java API to increase partitions:
(let [admin (admin/->AdminClient producer-config)]
  (.createPartitions 
   admin 
   {"greetings" (org.apache.kafka.clients.admin.NewPartitions/increaseTo 3)}))

Now, if you connect a consumer and try to send the same messages on that topic, you’ll see something like this:

key:  nil
value:  Good afternoon
partition:  2
timestamp:  1576935685036
offset:  11
key:  nil
value:  Hello
partition:  0
timestamp:  1576935685036
offset:  17
key:  nil
value:  Good night
partition:  0
timestamp:  1576935685037
offset:  18
key:  nil
value:  Good morning
partition:  1
timestamp:  1576935685036
offset:  14

This could look strange: after all, why is it consuming the messages out of order? Why “partition 2”, then 0, then 1? The reason for that is that Kafka will only guarantee ordering of messages if they are on the same partition. Jackdaw, on the other hand, makes no assumptions about which partition will be consumed first. Also, if you look at the messages, you’ll see that Hello went to partition 0, Good Morning to 1, Good afternoon to 2, and then Good night back to 0 – exactly how a round-robin should work.

You can also connect another consumer to the partition. For the sake of brevity, I’ll change the consumer code to the following:

(defn consume! []
  (with-open [consumer (-> (jack-client/consumer consumer-config)
                           (jack-client/subscribe [{:topic-name "greetings"}]))]
    (doseq [{:keys [value partition]} (log/log consumer 100)]
      (println "Partition: " partition " of message " value))))

And I’ll run lein run on two different terminals:

# First terminal:
Partition:  1  of message  Good morning
Partition:  0  of message  Hello
Partition:  0  of message  Good night

# Second terminal:
Partition:  2  of message  Good afternoon

And if I connect a third terminal:

# First terminal:
Partition:  1  of message  Good afternoon

# Second terminal:
Partition:  2  of message  Hello
Partition:  2  of message  Good night

# Third terminal:
Partition:  0  of message  Good morning

As you can see, Kafka automatically re-balanced my consumers in-place, and now each one is consuming a single partition. Now, if you have a topic and you want it to be consumed by two different systems, you can put each system in a different “consumer group”. This will send every message to all consumer groups, obeying the same rule of partitions. To test this, without stopping any of the current consumers, you can change the consumer-config to something like this:

(def consumer-config
  {"bootstrap.servers" "localhost:9092"
   "group.id"  "kafka-test.consumer-group-2" ; was -1 here
   "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"
   "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})

And then, on yet another terminal, run lein run. In my test, that was the result:

# First terminal:
Partition:  1  of message  Good afternoon

# Second terminal:
Partition:  2  of message  Hello
Partition:  2  of message  Good night

# Third terminal:
Partition:  0  of message  Good morning

# Fourth terminal:
Partition:  2  of message  Hello
Partition:  2  of message  Good night
Partition:  0  of message  Good morning
Partition:  1  of message  Good afternoon

This is kind of neat! Now, the ideal way of running Kafka is to control the way the lifecycle of each message works (for example, should we remove then from the topic as soon as we start to consume? Should we wait until we’re sure everything is fine? What do we do in case of errors? Can we re-process messages?) but this is way beyond the scope of this post!


0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *