Overview:
In this article, I would like to show Redis Stream With Spring Boot to demo how to implement real time stream processing.
Redis:
Redis was originally known as a REmote DIctionary Server for caching data. Along with Master/ReadReplication & Pub/Sub feature, Now Redis has added the support for Streams as well. Please take a look at some of the Redis articles if you have not read them already.
Stream Processing:
In the good old days, we used to collect data, store in a database and do nightly processing on the data. It is called batch processing!
In this Microservices era, we get continuous / never ending stream of data. Sometimes delaying this data processing might have a severe impact in our business. For example, Let’s consider an application like Netflix / YouTube. Based on the movie/videos we surf, these applications show immediate recommendations. It provides much better user experience and helps with the business. Similarly when we get all the credit card transactions, a Bank might want to check if there is any fraudulent activity and block the card immediately if it is found! Credit card provider would not want to delay this as part of nightly processing.
Stream processing is a real time continuous data processing. Lets see how we can achieve a simple real time stream processing using Redis Stream With Spring Boot.
We have also discussed real time data stream processing using Apache Kafka. Apache Kafka has been around for 10 years whereas Redis is relatively new in this field. Some of the features of the Redis Streams seem to have been inspired by Apache Kafka. Problem with Kafka is it is very difficult to configure. The infrastructure maintenance is very challenging. But Redis is very easy and light weight.
Redis Streams vs Pub/Sub:
- What is the need for Streams support when we have pub/sub feature!? You might ask. The answer is Pub/Sub is not same as Stream. These are some of the very basic differences between them. So stream is not an alternative for Pub/Sub.
Pub/Sub | Stream |
---|---|
No persistence | Persistent Data |
Receiver has to be up and running to receive the messages. It is a Fire & forget model | Consumers do not have to be up and running. Consumers can connect to the publisher any time and consume the messages from where they left off. |
All the subscribers would receive the same message | Only one consumer from a consumer group will receive the message. So No duplicate message processing. |
Redis Stream With Spring Boot:
- Our publisher will publish some evens to related to purchases into Redis. Lets call them purchase-events stream.
- A consumer group is interested in listening to those events. This could be for calculating the revenue or processing payment or sending out an email!
- When you need to do all of these – for ex: payment processing and sending out an email then you need a separate consumer group for each.
- The consumers will be consuming events & they can do anything with it. In our case we just find the price user paid and calculate the revenue by category.
- This can be logged in a separate DB. But to keep things simple, I will be logging this info as a SortedSet in Redis.
Project Setup:
Create a Spring Boot project as shown below.
-
Maven Dependencies
-
Project Structure:
- This will be a multi module maven project as shown below.
-
Product Category:
public enum Category {
APPLIANCES,
BOOKS,
COSMETICS,
ELECTRONICS,
OUTDOOR;
}
-
Product – DTO:
I use lombok for getters and setters.
@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Product {
private String name;
private double price;
private Category category;
}
Redis Stream – Producer:
The Producer application will keep on publishing PurchaseEvents periodically configured via publish.rate.
@Service
public class PurchaseEventProducer {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Value("${stream.key}")
private String streamKey;
@Autowired
private ProductRepository repository;
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Scheduled(fixedRateString= "${publish.rate}")
public void publishEvent(){
Product product = this.repository.getRandomProduct();
ObjectRecord<String, Product> record = StreamRecords.newRecord()
.ofObject(product)
.withStreamKey(streamKey);
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar(){
System.out.println(
"Total Events :: " + atomicInteger.get()
);
}
}
- publishEvent method publishes some random product purchases periodically.
- showPublishedEventsSoFar method simply shows the number of orders placed so far – just for logging purpose.
To keep things simple, I am not using any DB (well..except Redis) in this demo. So I have a list of products as shown here.
@Repository
public class ProductRepository {
private static final List<Product> PRODUCTS = List.of(
// appliances
new Product("oven", 500.00, Category.APPLIANCES),
new Product("dishwasher", 125.00, Category.APPLIANCES),
new Product("heater", 65.00, Category.APPLIANCES),
new Product("vacuum cleaner", 48.00, Category.APPLIANCES),
new Product("refrigerator", 1200.00, Category.APPLIANCES),
// books
new Product("how to win friends and influence", 13.00, Category.BOOKS),
new Product("ds and algorithms", 70.00, Category.BOOKS),
new Product("effective java", 41.00, Category.BOOKS),
new Product("clean architecture", 32.00, Category.BOOKS),
new Product("microservices", 16.00, Category.BOOKS),
// cosmetics
new Product("brush", 9.50, Category.COSMETICS),
new Product("face wash", 13.00, Category.COSMETICS),
new Product("makeup mirror", 17.50, Category.COSMETICS),
// electronics
new Product("sony 4k tv", 999.25, Category.ELECTRONICS),
new Product("headphone", 133.25, Category.ELECTRONICS),
new Product("macbook", 2517.25, Category.ELECTRONICS),
new Product("speaker", 65.25, Category.ELECTRONICS),
// outdoor
new Product("plants", 9.75, Category.OUTDOOR),
new Product("power tools", 73.50, Category.OUTDOOR),
new Product("pools", 111.75, Category.OUTDOOR)
);
public Product getRandomProduct(){
int random = ThreadLocalRandom.current().nextInt(0, 20);
return PRODUCTS.get(random);
}
}
The application.properties contains below properties.
stream.key=purchase-events
publish.rate=1000
Redis Stream – Consumer:
Our producer is ready. Lets create a consumer. To consume Redis Streams, we need to implement the StreamListener interface.
@Service
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Override
@SneakyThrows
public void onMessage(ObjectRecord<String, Product> record) {
System.out.println(
InetAddress.getLocalHost().getHostName() + " - consumed :" +
record.getValue()
);
this.redisTemplate
.opsForZSet()
.incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
.subscribe();
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar(){
System.out.println(
"Total Consumed :: " + atomicInteger.get()
);
}
}
- We simply show the consumed record first.
- Then we get the price paid and add it to the redis sorted set.
- Like producer we show the number of events consumed by this consumer periodically.
Redis Stream Configuration:
Once the consumer is created, We need to create a subscription by adding the above consumer to the StreamMessageListenerContainer instance.
@Configuration
public class RedisStreamConfig {
@Value("${stream.key:purchase-events}")
private String streamKey;
@Autowired
private StreamListener<String, ObjectRecord<String, Product>> streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(Product.class)
.build();
var listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
var subscription = listenerContainer.receiveAutoAck(
Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}
Dockerizing Infrastructure:
-
Dockerfile:
I want to create a multiple instances of consumer to process the purchase events. So I create docker file to dockerize our app.
# Use JRE11 slim
FROM openjdk:11.0-jre-slim
# Add the app jar
ADD target/*.jar redis-stream.jar
ENTRYPOINT java -jar redis-stream.jar
-
docker-compose file:
version: '3'
services:
redis:
image: redis
ports:
- 6379:6379
redis-commander:
image: rediscommander/redis-commander:latest
depends_on:
- redis
environment:
- REDIS_HOSTS=redis:redis
ports:
- 8081:8081
producer:
build: ./redis-stream-producer
image: vinsdocker/redis-stream-producer
depends_on:
- redis
environment:
- SPRING_REDIS_HOST=redis
- PUBLISH_RATE=1000
consumer:
build: ./redis-stream-consumer
image: vinsdocker/redis-stream-consumer
depends_on:
- redis
environment:
- SPRING_REDIS_HOST=redis
Redis Stream – Set up:
Lets bring up the redis and redis-commander instances first.
docker-compose up redis redis-commander
You can access the redis instance at port 8081 as shown here.
You can create a stream as shown here. These are all redis commands related to stream. Explore those things here.
XADD purchase-events * dummy-key dummy-value
I create a consumer-group as shown here.
XGROUP CREATE purchase-events purchase-events $
Spring Boot – Producer:
In a separate terminal, run the below command to bring up the producer.
docker-compose up producer
Once the producer has started it will start publishing events as per the given schedule.
producer_1 | 1585682873612-0
producer_1 | 1585682873812-0
producer_1 | 1585682874013-0
producer_1 | 1585682874215-0
producer_1 | 1585682874413-0
producer_1 | 1585682874613-0
producer_1 | 1585682874812-0
producer_1 | 1585682875012-0
producer_1 | Total Events :: 51
Spring Boot – Consumer:
Lets bring up 3 instances of our consumer.
docker-compose up --scale consumer=3
We could see the consumer consuming all the purchase-events. The load is distributed among all the consumers. Here consumer_2 shows that it consumed more events it is because it started first before all other consumers started.
producer_1 | 1585682887612-0
consumer_2 | 7b6c828647b0 - consumed :Product(name=how to win friends and influence, price=13.0, category=BOOKS)
producer_1 | 1585682887813-0
consumer_3 | 83699cab10bd - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
producer_1 | 1585682888012-0
consumer_1 | cdb3357593e6 - consumed :Product(name=headphone, price=133.25, category=ELECTRONICS)
producer_1 | 1585682888212-0
consumer_2 | 7b6c828647b0 - consumed :Product(name=oven, price=500.0, category=APPLIANCES)
consumer_1 | Total Consumed :: 18
consumer_2 | Total Consumed :: 84
producer_1 | 1585682888412-0
consumer_3 | 83699cab10bd - consumed :Product(name=makeup mirror, price=17.5, category=COSMETICS)
producer_1 | 1585682888612-0
consumer_1 | cdb3357593e6 - consumed :Product(name=ds and algorithms, price=70.0, category=BOOKS)
consumer_3 | Total Consumed :: 16
Access the redis-commander and look for the sorted set – revenue. We could see the products revenue by the category for the purchases happening real time.
Summary:
We were able to successfully implement real time stream processing by using Redis Steam With Spring Boot.
The complete source code is here.
Happy learning 🙂