Overview:
Let’s consider a distributed application in which requests are processed as and when they arrive. Let’s also consider that these requests are time-consuming tasks triggered by user actions on our application. As tasks will take time to process, it is better to get these requests queued, we could process them sequentially and notify the user once the tasks are completed. So that our systems could be loosely coupled and provide better user experience as users will not be blocked.
Sometimes, our business rules might say that we should prioritize the tasks based on some category. For example, premium user’s requests should be processed first before processing any regular user’s requests. We can see this behavior during flight on-board, shipping online products etc. For example, in this below picture, red ones are high priority tasks whereas the blue ones are low priority. Even though there is a blue/low priority task in the second position in the queue, a red/high priority task in the fourth position should be processed before processing any low priority tasks.
Let’s see how we could achieve that using Redis!
Sample Application:
- As usual to keep things simple, I am going to assume that
- Tasks are going to be finding the Nth position in the Fibonacci series! I am going to use 2^N algorithm to make the process very slow.
- There will be some categories like LOW, HIGH, URGENT to prioritize the tasks. Obviously URGENT tasks should be done first!
- I am creating a multi-module spring boot maven project.
- model module contains the classes to represent the Task and Priority.
public enum Priority implements Serializable {
LOW,
HIGH,
URGENT
}
@Getter
@AllArgsConstructor
public class Task implements Comparable<Task>, Serializable {
private final Priority priority;
private final int number;
@Override
public int compareTo(Task o) {
return o.getPriority().compareTo(this.getPriority());
}
}
Task-Executor:
- task-executor is a micro-service which would keep on polling Redis for the tasks. Redis acts like a task queue here!
- Service class
@Service
public class FibService {
// intentional - 2^N
public long calculate(int n){
if(n < 2)
return n;
return calculate(n - 1) + calculate(n - 2);
}
}
- Queue bean
@EnableScheduling
@SpringBootApplication
public class TaskExecutorApplication {
@Autowired
private RedissonClient redissonClient;
public static void main(String[] args) {
SpringApplication.run(TaskExecutorApplication.class, args);
}
@Bean
public RPriorityBlockingQueue<Task> getQueue(){
return this.redissonClient.getPriorityBlockingQueue("task");
}
}
- Executor
@Service
public class Executor {
@Autowired
private RPriorityBlockingQueue<Task> priorityQueue;
@Autowired
private FibService fibService;
@Scheduled(fixedDelay = 1000)
public void runTask() throws InterruptedException {
System.out.println("----------------------------------------");
Task task = this.priorityQueue.take();
System.out.println("Priority : " + task.getPriority());
System.out.println("Input : " + task.getNumber());
System.out.println("Result : " + this.fibService.calculate(task.getNumber()));
}
}
- I use redisson client libarary for Redis with below config.
singleServerConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
password: null
subscriptionsPerConnection: 5
clientName: null
address: "redis://master:6379"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 24
connectionPoolSize: 64
database: 0
dnsMonitoringInterval: 5000
threads: 2
nettyThreads: 2
codec: !<org.redisson.codec.FstCodec> {}
transportMode: "NIO"
Task-Scheduler:
- task-scheduler is a web application through which they submit the tasks to the queue/Redis.
- It exposes a REST API for submitting tasks!
@RestController
@RequestMapping("/task")
public class TaskController {
@Autowired
private RPriorityBlockingQueue<Task> priorityBlockingQueue;
@GetMapping("/{priority}/{number}")
public void schedule(@PathVariable String priority, @PathVariable int number){
this.priorityBlockingQueue.add(this.getTask(priority, number));
}
private Task getTask(final String priority, final int number){
return new Task(
Priority.valueOf(priority.toUpperCase()),
number
);
}
}
docker-compose:
The complete infrastructure set up would be as shown here!
version: '3'
services:
master:
container_name: master
image: redis
ports:
- 6379:6379
task-scheduler:
build: ./task-scheduler
image: vinsdocker/task-scheduler
ports:
- 8080:8080
task-executor:
build: ./task-executor
image: vinsdocker/task-executor
redis-commander:
container_name: redis-commander
hostname: redis-commander
image: rediscommander/redis-commander:latest
restart: always
environment:
- REDIS_HOSTS=master:master
ports:
- 8081:8081
- When we issue the docker-compose up command, the docker images will be built & infrastructure will be up and running!
- I submitted 100 LOW priority tasks with input 46. It takes significant amount of time to process each request.
- When I already have 98 requests in the queue, When I submitted a request with a HIGH for input 33, It gets executed immediately and then the remaining low tasks are continued.
task-executor_1 | Priority : LOW
task-executor_1 | Input : 46
task-executor_1 | Result : 1836311903
task-executor_1 | ----------------------------------------
task-executor_1 | Priority : LOW
task-executor_1 | Input : 46
task-executor_1 | Result : 1836311903
task-executor_1 | ----------------------------------------
task-executor_1 | Priority : HIGH
task-executor_1 | Input : 33
task-executor_1 | Result : 3524578
task-executor_1 | ----------------------------------------
task-executor_1 | Priority : LOW
task-executor_1 | Input : 46
task-executor_1 | Result : 1836311903
task-executor_1 | ----------------------------------------
Summary:
We were able to successfully demonstrate the priority-queue pattern and it will be helpful when the system needs to handle multiple tasks with different priority.
Happy coding 🙂
Hi Vinoth – Can you share link to source code?
Hi Vinoth,
I am awaiting for your response. Could you please share link to source code?