Work Queues trong RabbitMQ

Bài viết được sự cho phép của tác giả Giang Phan

Trong bài viết trước, chúng ta đã cùng tìm hiểu về cách tạo RabbitMQ Client (Producer và Consumer) sử dụng AMQP library để kết nối đến RabbitMQ server. Trong bài này, chúng ta sẽ cùng tìm hiểu chi tiết hơn về Work Queues được sử dụng để phân phối các task đến nhiều Worker.

  Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud
  RabbitMQ là gì? Hướng dẫn sử dụng RabbitMQ chi tiết

Work Queues

Ý tưởng chính của Work Queues (còn gọi là Task Queues) là tránh thực hiện một nhiệm vụ (work/task) tốn nhiều tài nguyên ngay lập tức và phải chờ nó hoàn thành. Thay vào đó chúng ta sẽ lên lịch (schedule) và các nhiệm vụ sẽ được thực hiện sau.

Chúng ta gói gọn (encapsulate) một task dưới dạng Message và gửi nó đến Queue. Một tiến trình worker chạy background sẽ lấy các task và cuối cùng thực thi chúng.

Có thể tạo nhiều worker để thực hiện các task, các task sẽ được chia sẻ giữa chúng.

Ví dụ

Trong ví dụ bên dưới, tôi sẽ tạo:

  • class Thread Producer có nhiệm vụ tạo các task và đưa vào Queue (RabbitMQ Broker).
  • class Thread Consumer (đóng vai trò như Worker) có nhiệm vụ lấy các task từ Queue về xử lý.
  • Sử dụng method channel.basicQos(1) : Mặc định, RabbitQM sử dụng Round-robin để gửi Message đến Consumer kế tiếp một cách tuần tự. Mỗi Consumer có thời gian xử lý mỗi task khác nhau. Để tránh một Consumer nhận quá nhiều mà không có thời gian để xử lý, một Consumer quá rãnh không có thời gian thực hiện. Chúng ta có sẽ sử dụng option basicQos() để nói với RabbitMQ rằng chỉ gửi 1 Message cho Consumer, khi nào xử lý xong hãy gửi Message kế tiếp. Nhờ vậy thời gian hoàn thành sớm hơn.
  • Sử dụng thuộc tính autoAck = false : Trong bài viết trước, chúng ta sử dụng thuộc tính autoAck là true khi Consumer nhận Message, thuộc tính này chỉ ra răng một ACK message sẽ được auto gửi đến RabbitMQ để báo với RabbitMQ rằng một Message đã được Consumer nhận, xử lý và Rabbit có thể xoá nó. Một vấn đề đặt ra là nếu một Consumer xử lý Task trong một thời gian dài, chỉ một phần của Task được hoàn thành và nó die. Khi đó, Message đã bị xoá bởi RabbitQM và Task sẽ bị mất. Để giải quyết vấn đề này, chúng ta sẽ không auto gửi Message, mà chúng ta sẽ gửi một ACK message đến RabbitMQ khi nó hoàn thành xử lý Message.
  • class App : mô phỏng việc tạo Task bởi Producer và lấy task xử lý bởi Consumer. Tôi sẽ tạo 1 Producer để tạo ra 10 Tasks và 2 Consumer để thay phiên nhau xử lý các Task được tạo bởi Producer. Một lưu ý là Consumer 1 sẽ xử lý mỗi task trong 100 milliseconds, Consumer 2 sẽ xử lý mỗi task trong 300 milliseconds. Nếu theo cơ chế Round-robin dispatching của RabbitMQ thông thường thì mỗi Consumer sẽ xử ý 5 Tasks, thời gian xử lý mỗi Task của Consumer 2 lâu hơn nên Consumer 1 sẽ rỗi rảnh trong khi Consumer 2 vẫn còn việc phải xử lý. Tuy nhiên, mình đã sử dụng basicQos() nên sẽ không có chuyện một Consumer rỗi rảnh và một Consumer có nhiều Task cần làm. Hãy xem kết quả ở chương trình bên dưới nhé.

Producer.java

package com.gpcoder;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer implements Runnable {
private final static String QUEUE_NAME = "gpcoder-queue";

private String name;
private int numOfMessage;

public Producer(String name, int numOfMessage) {
this.name = name;
this.numOfMessage = numOfMessage;
}

@Override
public void run() {
System.out.println("Create a ConnectionFactory for " + name);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try ( Connection connection = factory.newConnection();
Channel channel = connection.createChannel() ) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

System.out.println("Start sending messages ... ");
int index = 1;
while (index <= numOfMessage) {
String message = " Task #" + index++;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] " + name + " Sent: '" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("Close connection and free resources");
}
}
}

Consumer.java

package com.gpcoder;

import com.rabbitmq.client.*;

public class Consumer implements Runnable {

private final static String QUEUE_NAME = "gpcoder-queue";
private int numberConsumedMessage = 0;
private String name;
private int timeToFinishATask;

public Consumer(String name, int timeToFinishATask) {
this.name = name;
this.timeToFinishATask = timeToFinishATask;
}

@Override
public void run() {
try {
System.out.println("Create a ConnectionFactory for " + name);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

/**
* This tells RabbitMQ not to give more than one message to a worker at a time.
* Or, in other words, don't dispatch a new message to a worker
* until it has processed and acknowledged the previous one.
* Instead, it will dispatch it to the next worker that is not still busy.
*/
channel.basicQos(1);

System.out.println("Start receiving messages ... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] " + name + " Received: '" + message + "'");
consume(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [-] " + name + " Already consumed: " + (++numberConsumedMessage) + " Tasks");

};
CancelCallback cancelCallback = consumerTag -> { };
boolean autoAck = false;
String consumerTag = channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
System.out.println("Tag for " + name + ": " + consumerTag);
} catch (Exception e) {
e.printStackTrace();
}
}

private void consume(String message) {
try {
Thread.sleep(timeToFinishATask); // simulate time to produce the data
System.out.println(" [-] " + name + " Consumed for " + message + " in " + timeToFinishATask + " ms");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

App.java

package com.gpcoder;

public class App {

public static void main(String[] args) throws InterruptedException {

Producer producer1 = new Producer("[Producer 1]",10);
Consumer consumer1 = new Consumer("[Consumer 1]", 100);
Consumer consumer2 = new Consumer("[Consumer 2]", 300);

new Thread(producer1).start();

new Thread(consumer1).start();
new Thread(consumer2).start();
}
}

Output của chương trình trên:

Create a ConnectionFactory for [Producer 1]
Create a ConnectionFactory for [Consumer 2]
Create a ConnectionFactory for [Consumer 1]
Start sending messages ...
Start receiving messages ...
Start receiving messages ...
[x] [Producer 1] Sent: ' Task #1'
[x] [Producer 1] Sent: ' Task #2'
[x] [Producer 1] Sent: ' Task #3'
[x] [Producer 1] Sent: ' Task #4'
[x] [Producer 1] Sent: ' Task #5'
[x] [Producer 1] Sent: ' Task #6'
[x] [Producer 1] Sent: ' Task #7'
[x] [Producer 1] Sent: ' Task #8'
[x] [Producer 1] Sent: ' Task #9'
[x] [Producer 1] Sent: ' Task #10'
Tag for [Consumer 2]: amq.ctag-yVYxlwJ6750wJrwcMrO0nQ
Tag for [Consumer 1]: amq.ctag-RBZ-9HjR0-B_Gf2lQ0js7g
[x] [Consumer 2] Received: ' Task #2'
[x] [Consumer 1] Received: ' Task #1'
Close connection and free resources
[-] [Consumer 1] Consumed for Task #1 in 100 ms
[-] [Consumer 1] Already consumed: 1 Tasks
[x] [Consumer 1] Received: ' Task #3'
[-] [Consumer 1] Consumed for Task #3 in 100 ms
[-] [Consumer 1] Already consumed: 2 Tasks
[x] [Consumer 1] Received: ' Task #4'
[-] [Consumer 2] Consumed for Task #2 in 300 ms
[-] [Consumer 2] Already consumed: 1 Tasks
[x] [Consumer 2] Received: ' Task #5'
[-] [Consumer 1] Consumed for Task #4 in 100 ms
[-] [Consumer 1] Already consumed: 3 Tasks
[x] [Consumer 1] Received: ' Task #6'
[-] [Consumer 1] Consumed for Task #6 in 100 ms
[-] [Consumer 1] Already consumed: 4 Tasks
[x] [Consumer 1] Received: ' Task #7'
[-] [Consumer 1] Consumed for Task #7 in 100 ms
[-] [Consumer 1] Already consumed: 5 Tasks
[x] [Consumer 1] Received: ' Task #8'
[-] [Consumer 2] Consumed for Task #5 in 300 ms
[-] [Consumer 2] Already consumed: 2 Tasks
[x] [Consumer 2] Received: ' Task #9'
[-] [Consumer 1] Consumed for Task #8 in 100 ms
[-] [Consumer 1] Already consumed: 6 Tasks
[x] [Consumer 1] Received: ' Task #10'
[-] [Consumer 1] Consumed for Task #10 in 100 ms
[-] [Consumer 1] Already consumed: 7 Tasks
[-] [Consumer 2] Consumed for Task #9 in 300 ms
[-] [Consumer 2] Already consumed: 3 Tasks

Như bạn thấy, 2 Consumer cùng xử lý các công việc được tạo bởi Producer. Tuy nhiên, thời gian xử lý mỗi Task của Consumer 1 nhanh hơn Consumer 2, nên nó có thể done nhiều task hơn: Consumer 1 có thể xử lý 7 tasks, trong khi Consumer 2 chỉ xử lý 3 tasks.

Tài liệu tham khảo:

Bài viết gốc được đăng tải tại gpcoder.com

Có thể bạn quan tâm:

Xem thêm Việc làm IT hấp dẫn trên TopDev