Recently I was trying to study Kafka, but I didn’t find a single resource that would give me a quick introduction and hands-on experience with it and Clojure. So, I’m making my own here! Don’t expect a “too deep introduction” – this is just the quick-and-dirty introduction about the concepts, and then I’ll show some code examples in Clojure
Kafka is a messaging system similar to RabbitMQ and SQS. The great differential is that it’s faster than both solutions, and works very well in cluster mode. Installing Kafka locally is quite complicated so you probably will wants to use the docker-compose.yaml
file below:
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:5.3.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-enterprise-kafka:5.3.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
This file will create the Kafka broker (like a single node of the messaging) and will add Zookeeper (that will allow you to coordinate between different Kafkas, decide which node is the leader, and also participate on the node election when the leader goes down, and other things). You will connect into 9092
port, and then listen and send messages from there.
Instead of using queues like RabbitMQ or SQS, Kafka uses topics. Each topic have a number of partitions, and you can increase the number of then (but never decrease it). The partition serves for multiple purposes: the first is to distribute the message (or the group of messages) into different Kafka nodes; and the other is to control how to send messages between different consumers listening on the same topic, in the same consumer group.
The rules are quite simple: if you have a system and want to receive messages from a topic, you will “subscribe” to the topic. If you have, for example, two instances of the same system listening to the same topic but only one partition, only one instance will consume the messages: the other will be idle. Similarly, if you have two instances of the same system listening to a single topic, and 3 partitions, one of the instances will listen to two partitions and the other to only one.
To show how it works in practice in Clojure I’m using the Jackdaw library, and this is how the code looks like with subscribers to a single topic (in this case, the topic is named “greetings”):
(ns kafka-test.core (:require [jackdaw.client :as jack-client] [jackdaw.client.log :as log]) (:gen-class)) (def consumer-config ; Where we will connect... {"bootstrap.servers" "localhost:9092" ; The "consumer group" that we're listening.... "group.id" "kafka-test.consumer-group-1" ; How do we deserialize keys? "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" ; How do we deserialize values? "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"}) (defn consume! [] (with-open [consumer (-> (jack-client/consumer consumer-config) (jack-client/subscribe [{:topic-name "greetings"}]))] (doseq [{:keys [key value partition timestamp offset]} (log/log consumer 100)] (println "key: " key) (println "value: " value) (println "partition: " partition) (println "timestamp: " timestamp) (println "offset: " offset)))) (defn -main [& args] (consume!))
If you save it and run with lein run
, you’ll see that it is able to connect to Kafka and, if you send some messages to that topic, it’ll consume then. The key
will be nil
, and the partition
will always be 0
. That’s normal (for now).
You can send messages to specific partitions of a specific topic, or you can just send a message and let Kafka do round-robin – the first message will be sent to the first partition, the second to the second, and so on until you exhaust the number of the partitions – and then the next message will go to the first partition again, and so on. Or, you can send a “key” with the message and then Kafka will do some kind of algorithm (this algorithm is also customizable) to, based on the key, choose which partition that message will go. This allows you to be sure that different messages, but from the same context (let’s say a user ID or a specific system ID) will go always to the same partition on that topic.
To send messages, you can do the following code:
(def producer-config {"bootstrap.servers" "localhost:9092" "acks" "all" "client.id" "foo" "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" "value.serializer" "org.apache.kafka.common.serialization.StringSerializer"}) (with-open [producer (jack-client/producer producer-config)] (jack-client/produce! producer {:topic-name "greetings"} "Hello") (jack-client/produce! producer {:topic-name "greetings"} "Good morning") (jack-client/produce! producer {:topic-name "greetings"} "Good afternoon") (jack-client/produce! producer {:topic-name "greetings"} "Good night"))
Please notice that I open a connection to the producer, and then send the messages. This is to avoid the delay of connection, so in a real world example you probably want to open a connection and keep it open to avoid delays. Again, this producer will just send messages to a single partition (because, until now, our Kafka topic only have one!), so let’s change this too:
; We need a new require (require '[jackdaw.admin :as admin]) ; We need the Java API to increase partitions: (let [admin (admin/->AdminClient producer-config)] (.createPartitions admin {"greetings" (org.apache.kafka.clients.admin.NewPartitions/increaseTo 3)}))
Now, if you connect a consumer and try to send the same messages on that topic, you’ll see something like this:
key: nil value: Good afternoon partition: 2 timestamp: 1576935685036 offset: 11 key: nil value: Hello partition: 0 timestamp: 1576935685036 offset: 17 key: nil value: Good night partition: 0 timestamp: 1576935685037 offset: 18 key: nil value: Good morning partition: 1 timestamp: 1576935685036 offset: 14
This could look strange: after all, why is it consuming the messages out of order? Why “partition 2”, then 0, then 1? The reason for that is that Kafka will only guarantee ordering of messages if they are on the same partition. Jackdaw, on the other hand, makes no assumptions about which partition will be consumed first. Also, if you look at the messages, you’ll see that Hello
went to partition 0
, Good Morning
to 1
, Good afternoon
to 2
, and then Good night
back to 0
– exactly how a round-robin should work.
You can also connect another consumer to the partition. For the sake of brevity, I’ll change the consumer code to the following:
(defn consume! [] (with-open [consumer (-> (jack-client/consumer consumer-config) (jack-client/subscribe [{:topic-name "greetings"}]))] (doseq [{:keys [value partition]} (log/log consumer 100)] (println "Partition: " partition " of message " value))))
And I’ll run lein run
on two different terminals:
# First terminal: Partition: 1 of message Good morning Partition: 0 of message Hello Partition: 0 of message Good night # Second terminal: Partition: 2 of message Good afternoon
And if I connect a third terminal:
# First terminal: Partition: 1 of message Good afternoon # Second terminal: Partition: 2 of message Hello Partition: 2 of message Good night # Third terminal: Partition: 0 of message Good morning
As you can see, Kafka automatically re-balanced my consumers in-place, and now each one is consuming a single partition. Now, if you have a topic and you want it to be consumed by two different systems, you can put each system in a different “consumer group”. This will send every message to all consumer groups, obeying the same rule of partitions. To test this, without stopping any of the current consumers, you can change the consumer-config
to something like this:
(def consumer-config {"bootstrap.servers" "localhost:9092" "group.id" "kafka-test.consumer-group-2" ; was -1 here "key.deserializer" "org.apache.kafka.common.serialization.StringDeserializer" "value.deserializer" "org.apache.kafka.common.serialization.StringDeserializer"})
And then, on yet another terminal, run lein run
. In my test, that was the result:
# First terminal: Partition: 1 of message Good afternoon # Second terminal: Partition: 2 of message Hello Partition: 2 of message Good night # Third terminal: Partition: 0 of message Good morning # Fourth terminal: Partition: 2 of message Hello Partition: 2 of message Good night Partition: 0 of message Good morning Partition: 1 of message Good afternoon
This is kind of neat! Now, the ideal way of running Kafka is to control the way the lifecycle of each message works (for example, should we remove then from the topic as soon as we start to consume? Should we wait until we’re sure everything is fine? What do we do in case of errors? Can we re-process messages?) but this is way beyond the scope of this post!