Overview:
In the previous article, we had discussed the basic terminologies of Kafka and created local development infrastructure using docker-compose. In this article, I would like to show how to create a simple kafka producer and consumer using Spring-boot.
Prerequisite:
- Java 8 or above installed
- Kafka is up and running
Goal:
Aim of this post is to show a way to produce some messages into a Kafka topic. So that these messages can be consumer later by a different application. Just for easy understanding, we would be producing some random numbers and write them into a Kafka topic. Create a Kafka topic called random-number with 3 partitions.
Kafka Producer:
- The producer is going to be a spring boot application. Go to Spring initializer. Create a maven project called kafka-producer as shown here and add Kafka as the dependency.
- Import the mvn project into your IDE.
- I create a RandomNumberProducer class and application.yml as shown in this tree structure.
- The application.yml contains the following.
- bootstrap-servers are the kafka brokers. you can mention all or one of them. It is better to list all the brokers in the cluster.
- We would be writing messages into a topic. So create a topic called random-number in your kafka cluster. The producer itself would create the topic if it is not present. However it would go with default values like 1 partition which you might not want. Usually it is better to create these topics upfront in your cluster.
- The messages could be key-value pairs. Key is optional and can be omitted. (It is recommended to use keys). key and value serializers indicate the types of Key and value.
spring:
kafka:
bootstrap-servers:
- localhost:9091
template:
default-topic: random-number
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- KafkaProducerApplication – I have added the @EnableScheduling as we would keep on producing messages every second.
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication
public class KafkaProducerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
}
- RandomNumberProducer –
- We have autowired the KafkaTemplate. We have already setup all the cluster & topic information via application.yml
- The produce method is scheduled to execute every second. Whenever it is executed, it writes a message into the Kafka topic.
- I have explicitly added the Sysout statement – Just to see which host produced which number just in case of multiple instances of the producers are running. We will understand the use in the next article
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ThreadLocalRandom;
@Component
public class RandomNumberProducer {
private static final int MIN = 10;
private static final int MAX = 100_000;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedRate = 1000)
public void produce() throws UnknownHostException {
int random = ThreadLocalRandom.current().nextInt(MIN, MAX);
this.kafkaTemplate.sendDefault(String.valueOf(random));
//just for logging
String hostName = InetAddress.getLocalHost().getHostName();
System.out.println(String.format("%s produced %d", hostName, random));
}
}
- Run the spring boot application and ensure that it works fine. At this point, It will start producing messages into the Kafka topic without any issues.
Kafka Consumer:
- The above project is just for producer. We are going to create completely a different application for consuming these messages.
- Go to Spring initializer. Create a maven project called kafka-consumer with kafka as the dependency.
- My project structure looks like this.
- application.yml contains the following information.
- I have added all the kafka brokers here under bootstrap-servers
- We might want to run multiple instances of our kafka-consumer application. So it is better to add a group-id for our application. I have simply named as random-consumer.
- auto-offset-reset is a property for the consumer. When the consumer subscribes to a topic and it gets the partitions assignments. [If there are 3 partitions and 3 consumers in the group, each consumer will be assigned to a partition. Consumer has to determine if it needs to start consuming the messages from the beginning or starts consuming the new messages]. Mostly it will be earliest.
- Our key-value pairs are string types. So we use String deserializers.
spring:
kafka:
bootstrap-servers:
- localhost:9091
- localhost:9092
- localhost:9093
consumer:
group-id: random-consumer
auto-offset-reset: earliest
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
- KafkaConsumerApplication
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}
}
- RandomNumberConsumer component looks like this.
- We subscribe to the random-number topic messages
- Whenever there is a message in the topic, this method would be executed.
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Component
public class RandomNumberConsumer {
@KafkaListener(topics = "random-number")
public void consume(String message) throws UnknownHostException {
String hostName = InetAddress.getLocalHost().getHostName();
System.out.println(String.format("%s consumed %s", hostName, message));
}
}
- Now hopefully you get the idea. Run these applications side by side. Watch the logs in the console.
Demo:
Summary:
We were able to successfully create a simple producer and consumer applications. We also noticed that all the produced messages are consumed by the consumer applications. In the next article, we will see how we can dockerize and run multiple instances of producers and consumers.
Can you please share some real life usage example of kafka in test automation.
I would like to cover more about technology, microservices, application architecture, design. not just test automation as I used to do before.
Ok sounds good. But, still from your experience is there any particular area in test automation where we can make use of Kafka. I am just curious to know.
Can be used to support the test automation infrastructure in large organizations. but could not think of a very good use case.
Great tutorial!
I’m just joining the party and I am very glad to have come across these amazing tutorials.
Looking forward to more!