Site icon Vinsguru

Kafka – Scaling Consumers Out In A Consumer Group

Overview:

This is a 3rd part in the Kafka series. If you have not read the previous articles, I would encourage you to read those in the below order.

We had already seen producing messages into a Kafka topic and the messages being processed by a consumer. As long as the producer and the consumer are doing things in the same rate or producer producing messages per unit time is less than the consumers processing messages per unit time, We should be good. What if the consumer is slower than the producer?!! Lets say, A producer produces a message every second. But a consumer takes 2 seconds to process a message. Now in 10 seconds, we would have had 10 messages and 5 of them would have been processed by the consumer. So the consumer could never catch up or process all the messages. Eventually we will have infinite number of messages in the Kafka topic which is NOT good.

In this article, We would see how to scale the consumers when the consumers processing rates are lesser compared to producers.

Partitions & Consumer Group:

As I had mentioned in the previous articles, Topic is made up of partitions. Partitions decide the max number of consumers you can have in a group.

 

In the above picture, we have only one consumer. It can read all the messages from all the partitions. When you increase the number of consumers in the group, partition reassignment happens and instead of consumer 1 reading all the messages from all the partitions, consumer 2 could share some of the load with consumer 1 as shown below.

Now the obvious question would be, What happens If I have more number of consumers than the number of partitions.? Each consumer would be assigned 1 partition. Any additional consumers in the group will be sitting idle unless you increase the number of partitions for a Topic.

So, we need to choose the partitions accordingly. That decides the max number of consumers in the group. Changing the partition for an existing topic is really not recommended as It could cause issues. That is, Lets assume a producer producing names into a topic where we have 3 partitions. All the names starting with A-I go to Partition 1, J-R in partition 2 and S-Z in partition 3. Now lets also assume that we have already produced 1 million messages. Now if you suddenly increase the number of partitions to 5 from 3, It will create a different range now. That is, A-F in Partition 1, G-K in partition 2, L-Q in partition 3, R-U in partition 4 and V-Z in partition 5. Do you get it? It kind of affects the order of the messages we had before! So you need to be aware of this. If this could be a problem, then we need to choose the partition accordingly upfront.

Main aim of this article is to demo the producer and consumers in a group behavior.

Sequence Number Producer:

We would be using the same code as we saw in the previous article. However instead of random number, we would be generating number sequentially. So that we can understand better what is going on! (The class name is still RandomNumberProducer. But that’s OK)

@Component
public class RandomNumberProducer {

    private static final int MIN = 10;
    private static final int MAX = 100_000;
    private int counter = 1;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedRate = 1000)
    public void produce() throws UnknownHostException {
        int random = counter++;
        this.kafkaTemplate.sendDefault(String.valueOf(random));
        //just for logging
        String hostName = InetAddress.getLocalHost().getHostName();
        System.out.println(String.format("%s produced %d", hostName, random));
    }

}

Slow Consumer:

To demo the slow consumer behavior, I am introducing a property message.processing.time in the consumer application.

spring:
  kafka:
    bootstrap-servers:
      - localhost:9091
      - localhost:9092
      - localhost:9093
    consumer:
      group-id: random-consumer
      auto-offset-reset: earliest
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer

message:
  processing:
    time: 0

Now the Spring boot consumer application would be like this. Based on the property value, we would sleep to simulate the slowness!

@Component
public class RandomNumberConsumer {

    @Value("${message.processing.time}")
    private long processingTime;

    @KafkaListener(topics = "random-number")
    public void consumer(String message) throws UnknownHostException, InterruptedException {
        String hostName = InetAddress.getLocalHost().getHostName();
        System.out.println(String.format("%s consumed %s", hostName, message));
        Thread.sleep(processingTime);
    }

}

Dockerizing Producer:

# Use JRE8 slim
FROM openjdk:8u191-jre-alpine3.8

# Add the app jar
ADD target/*.jar producer.jar

ENTRYPOINT java -jar producer.jar \
                    --spring.kafka.bootstrap-servers=$BROKER
mvn clean package
docker build -t vinsdocker/kafka-consumer .

Dockerizing Consumer:

# Use JRE8 slim
FROM openjdk:8u191-jre-alpine3.8

# Add the app jar
ADD target/*.jar consumer.jar

ENTRYPOINT java -jar consumer.jar \
                    --spring.kafka.bootstrap-servers=$BROKER \
                    --message.processing.time=$MESSAGE_PROCESSING_TIME
mvn clean package
docker build -t vinsdocker/kafka-consumer .

Producer & Consumer Group Demo:

version: '3'

services:
  producer:
    image: vinsdocker/kafka-producer
    environment:
      - BROKER=kafka1:19091
  consumer:
    image: vinsdocker/kafka-consumer
    environment:
      - BROKER=kafka2:19092
      - MESSAGE_PROCESSING_TIME=0

In the above yml file, We have 2 services, 1 for producer and 1 for consumer. Both services need the list of brokers. We do not use localhost:9091 etc here as it is because when you run these services via docker-compose , docker creates a network and places all the services like zookeeper, kafka1, kafka2, kafka3, producer and consumer in the same network. So they find each other by their service name. If you take a look at the kafka broker services, they are listening on ports kafka1:19091, kafka2:19092, kafka3:19093 etc. So these can be passed as broker list. Also the message processing time value is passed as 0 for now.

docker-compose -f kafka-cluster.yml up
docker-compose -f producer-consumer.yml up

version: '3'

services:
  producer:
    image: vinsdocker/kafka-producer
    environment:
      - BROKER=kafka1:19091
  consumer:
    image: vinsdocker/kafka-consumer
    environment:
      - BROKER=kafka2:19092
      - MESSAGE_PROCESSING_TIME=2000
docker-compose -f producer-consumer.yml up

docker-compose -f producer-consumer.yml up --scale consumer=2

 

docker-compose -f producer-consumer.yml up --scale consumer=3

 

docker-compose -f producer-consumer.yml up --scale consumer=4

Summary:

When the consumer’s throughput is lesser than the producer’s throughput, then we will NOT be able to process all the messages in the Kafka topic. If we have more partitions, then we can scale the consumer out for a consumer-group to match producer’s throughput.

 

Share This:

Exit mobile version