Sử dụng Fanout Exchange trong RabbitMQ
Bài viết được sự cho phép của tác giả Giang Phan
Trong các bài viết trước, chúng ta đã cùng tìm hiểu về Direct Exchange. Trong bài này, tôi sẽ giới thiệu với các bạn một loại exchange khác là Fanout Exchange.
Flow của một Message trong Fanout Exchange
Fanout exchange (định tuyến broadcast – amq.fanout) định tuyến message (copy message) tới tất cả queue mà nó được bind, với bất kể một routing key nào. Giả sử, nếu nó N queue được bind bởi một Fanout exchange, khi một message mới published, exchange sẽ định tuyến message đó tới tất cả N queues. Fanout exchange được sử dụng cho định tuyến message broadcast (quảng bá).
Flow của một Message trong Fanout Exchange như sau:
- Một Producer sẽ tạo một Message và publish tới Exchange.
- Một hoặc nhiều Queue bind tới Fanout Exchange không cần thông tin routing key.
- Một Message tới Exchange sẽ được chuyển tiếp đến tất cả các Queue mà không có bất kỳ điều kiện kiểm tra nào.
Exchange Fanout hữu ích với trường hợp ta cần một dữ liệu được gửi tới nhiều ứng dụng khác nhau với cùng một message nhưng cách xử lý ở mỗi ứng dụng là khác nhau.
Ví dụ Fanout Exchange trong RabbitMQ
Trong ví dụ này, tôi tạo một Fanout Exchange có tên GPCoderFanoutExchange, tạo 3 Queue binding tới Fanout Exchange này: QDeveloper, QManager, QGeneral.
Một số class của chương trình:
- ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
- FanoutExchangeChannel : class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, publish/ subscribe message, …
- Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
- Producer: để gửi Message đến Exchange.
- Consumer: để nhận Message từ Queue.
- App: giả lập việc gửi nhận Message thông qua Fanout Exchange của RabbitMQ.
ConnectionManager.java
package com.gpcoder.fanoutexchange; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionManager { private ConnectionManager() { super(); } public static Connection createConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); return factory.newConnection(); } }
FanoutExchangeChannel.java
package com.gpcoder.fanoutexchange; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class FanoutExchangeChannel { private String exchangeName; private Channel channel; private Connection connection; public FanoutExchangeChannel(Connection connection, String exchangeName) throws IOException { this.exchangeName = exchangeName; this.connection = connection; this.channel = connection.createChannel(); } public void declareExchange() throws IOException { // exchangeDeclare( exchange, builtinExchangeType, durable) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true); } public void declareQueues(String ...queueNames) throws IOException { for (String queueName : queueNames) { // queueDeclare - (queueName, durable, exclusive, autoDelete, arguments) channel.queueDeclare(queueName, true, false, false, null); } } public void performQueueBinding(String queueName) throws IOException { // Create bindings - (queue, exchange, routingKey) channel.queueBind(queueName, exchangeName, ""); } public void subscribeMessage(String queueName) throws IOException { // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback) channel.basicConsume(queueName, true, ((consumerTag, message) -> { System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody())); }), consumerTag -> { System.out.println(consumerTag); }); } public void publishMessage(String message) throws IOException { // basicPublish - ( exchange, routingKey, basicProperties, body) System.out.println("[Send]: " + message); channel.basicPublish(exchangeName, "", null, message.getBytes()); } }
Constant.java
package com.gpcoder.fanoutexchange; public final class Constant { // Exchange public static final String EXCHANGE_NAME = "GPCoderFanoutExchange"; // Queue public static final String DEV_QUEUE_NAME = "QDeveloper"; public static final String MANAGER_QUEUE_NAME = "QManager"; public static final String GENERAL_QUEUE_NAME = "QGeneral"; private Constant() { super(); } }
Producer.java
package com.gpcoder.fanoutexchange; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.gpcoder.fanoutexchange.Constant.*; public class Producer { private FanoutExchangeChannel channel; public void start() throws IOException, TimeoutException { // Create connection Connection connection = ConnectionManager.createConnection(); // Create channel channel = new FanoutExchangeChannel(connection, EXCHANGE_NAME); // Create fanout exchange channel.declareExchange(); // Create queues channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, GENERAL_QUEUE_NAME); // Binding queues without routing key channel.performQueueBinding(DEV_QUEUE_NAME); channel.performQueueBinding(MANAGER_QUEUE_NAME); channel.performQueueBinding(GENERAL_QUEUE_NAME); } public void send(String message) throws IOException { // Send message channel.publishMessage(message); } }
Consumer.java
package com.gpcoder.fanoutexchange; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; import static com.gpcoder.fanoutexchange.Constant.*; public class Consumer { private FanoutExchangeChannel channel; public void start() throws IOException, TimeoutException { // Create connection Connection connection = ConnectionManager.createConnection(); // Create channel channel = new FanoutExchangeChannel(connection, EXCHANGE_NAME); // Create fanout exchange channel.declareExchange(); // Create queues channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, GENERAL_QUEUE_NAME); // Binding queues without routing key channel.performQueueBinding(DEV_QUEUE_NAME); channel.performQueueBinding(MANAGER_QUEUE_NAME); channel.performQueueBinding(GENERAL_QUEUE_NAME); } public void subscribe() throws IOException { // Subscribe message channel.subscribeMessage(DEV_QUEUE_NAME); channel.subscribeMessage(MANAGER_QUEUE_NAME); channel.subscribeMessage(GENERAL_QUEUE_NAME); } }
App.java
package com.gpcoder.fanoutexchange; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class App { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // Create producer Producer producer = new Producer(); producer.start(); // Send one message to exchange, it will be forwarded to all queues producer.send("gpcoder message 1"); // Create consumer Consumer consumer = new Consumer(); consumer.start(); consumer.subscribe(); TimeUnit.SECONDS.sleep(1); // Try to send a second message producer.send("gpcoder message 2"); } }
Output chương trình:
[Send]: gpcoder message 1 [Received] [QDeveloper]: gpcoder message 1 [Received] [QManager]: gpcoder message 1 [Received] [QGeneral]: gpcoder message 1 [Send]: gpcoder message 2 [Received] [QGeneral]: gpcoder message 2 [Received] [QDeveloper]: gpcoder message 2 [Received] [QManager]: gpcoder message 2
Như bạn thấy, tất cả các Consumer đều nhận được Message.
Lưu ý: Chúng ta có thể sử dụng phương thức queueDeclare() để tạo một Queue non-durable, exclusive, autodelete và tên được tạo một cách ngẫu nhiên.
Tài liệu tham khảo:
- https://www.rabbitmq.com/tutorials/tutorial-three-java.html
- https://www.rabbitmq.com/tutorials/amqp-concepts.html
Bài viết gốc được đăng tải tại gpcoder.com
Có thể bạn quan tâm:
- Sử dụng Alternate Exchange trong RabbitMQ
- Sử dụng Headers Exchange trong RabbitMQ
- Sử dụng Topic Exchange (Publish/Subscribe) trong RabbitMQ
Xem thêm Việc làm IT hấp dẫn trên TopDev
- T Tìm hiểu Laptop AI – So sánh Laptop AI với Laptop thường
- M MySQL vs MS SQL Server: Phân biệt hai RDBMS phổ biến nhất
- S SearchGPT là gì? Công cụ tìm kiếm mới có thể đánh bại Google?
- C Cách tích hợp ChatGPT vào Google Search siêu dễ
- V VoiceGPT là gì? Giới thiệu tính năng và cách cài đặt sử dụng Voice GPT
- G GPT-4o Mini – Thông minh hơn và tiết kiệm hơn?
- C ChatGPT-4o là gì? Điểm mới của ChatGPT-4o vs ChatGPT-4
- C Chat GPT 4.0 là gì? Có gì vượt trội so với Chat GPT phiên bản cũ?
- C Cách tự học code web, tìm kiếm công việc dễ dàng và hạnh phúc mỗi ngày
- G Giới thiệu 15 website học và luyện hack hợp pháp