Sử dụng Alternate Exchange trong RabbitMQ

Bài viết được sự cho phép của tác giả Giang Phan

Khi một Message đến Exchange, nếu không tìm thấy Queue nào phù hợp cho Message, Message sẽ tự động bị hủy. RabbitMQ cung cấp một tiện ích mở rộng AMQP được gọi là Alternate Exchange, để collect các Message không thể gửi được trước khi chúng bị huỷ. Chúng ta sẽ biết được cách làm việc và cài đặt của Alternate Exchange trong bài viết này.

  Sử dụng binding Exchange to Exchange trong RabbitMQ
  Sử dụng Dead Letter Exchange trong RabbitMQ

Flow của một Message trong Alternate Exchange

Alternate Exchange được định nghĩa để collect các Message không thể gửi được (rejected/ discarded/ unrouted) trước khi chúng bị huỷ.

Bất kỳ 4 loại Exchange: Direct, Fanout, Topic, Headers có thể được chỉ định như một Alternate Exchange cho một Exchange khác thuộc bất kỳ loại nào. Tuy nhiên, ta nên sử dụng Fanout Exchange như một Alternate Exchange vì nó chuyển tiếp tin nhắn vô điều kiện.

Để chỉ định một Alternate Exchange cho một Exchange GPCoder.AltTopicExchange, chúng ta chỉ cần thêm arguments: alternate-exchange=”GPCoder.AltFanoutExchange” cho GPCoder.AltTopicExchange. Khi đó GPCoder.AltFanoutExchange trở thành một Alternate Exchange cho GPCoder.AltTopicExchange.

Sử dụng Alternate Exchange trong RabbitMQ

Flow của một Message trong Alternate Exchange:

  • Một Producer publish một Message đến source Exchange với một routing key dựa trên loại của Exchange. Trong trường hợp này là GPCoder.AltTopicExchange.
  • Một Fanout Exchange (GPCoder.AltFanoutExchange), được chỉ định là một AlternateExchange cho GPCoder.AltTopicExchange.
  • Nếu một Message có routing key match với bất kỳ routing key pattern nào mà Queue đã binding với GPCoder.AltTopicExchange, thì Message sẽ được chuyển đến Queue match đó.
  • Nếu không match với bất kỳ routing key pattern nào, khi đó Message sẽ bị reject.
  • Theo mặc định của RabbitMQ, một Message bị reject sẽ bị huỷ. Trong trường hợp chúng ta có Alternate Exchange, nó sẽ nhận các Message bị reject và chuyển đến Queue.
  • Cuối cùng, Consumer có thể binding đến Queue của Alternate Exchange để xử lý.

Ví dụ binding Exchange to Exchange trong RabbitMQ

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • ExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, binding Exchange đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
  • AlternateExchangeProducer : để gửi Message đến GPCoder.AltFanoutExchange.
  • TopicExchangeProducer : để gửi Message đến GPCoderTopicExchange.
  • AlternateExchangeConsumer : để nhận Message từ Queue được binding đến GPCoder.AltFanoutExchange.
  • TopicExchangeConsumer : để nhận Message từ Queue được binding đến GPCoder.AltTopicExchange.
  • App: giả lập việc gửi nhận Message thông qua Topic Exchange của RabbitMQ.

ConnectionManager.java

package com.gpcoder.alternateexchange; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConnectionManager {     private ConnectionManager() {         super();     }     public static Connection createConnection() throws IOException, TimeoutException {         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("localhost");         return factory.newConnection();     } }

ExchangeChannel.java

package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ExchangeChannel {

private String exchangeName;
private Channel channel;
private Connection connection;

public ExchangeChannel(Connection connection, String exchangeName) throws IOException {
this.exchangeName = exchangeName;
this.connection = connection;
this.channel = connection.createChannel();
}

public void declareExchange(BuiltinExchangeType exchangeType) throws IOException {
// exchangeDeclare( exchange, builtinExchangeType, durable)
channel.exchangeDeclare(exchangeName, exchangeType, true);
}

public void declareExchangeWithAlternateExchagne(BuiltinExchangeType exchangeType, String alternateExchangeName) throws IOException {
// Declare the topic exchange and set an alternate-exchange
// exchangeDeclare( exchange, builtinExchangeType, durable, autoDelete, arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", alternateExchangeName);
channel.exchangeDeclare(exchangeName, exchangeType, true, false, arguments);
}

public void declareQueues(String ...queueNames) throws IOException {
for (String queueName : queueNames) {
// queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare(queueName, true, false, false, null);
}
}

public void performQueueBinding(String queueName, String routingKey) throws IOException {
// Create bindings - (queue, exchange, routingKey)
channel.queueBind(queueName, exchangeName, routingKey);
}

public void subscribeMessage(String queueName) throws IOException {
// basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
channel.basicConsume(queueName, true, ((consumerTag, message) -> {
System.out.println("[Received] [" + queueName + "]: " + consumerTag);
System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
}), consumerTag -> {
System.out.println(consumerTag);
});
}

public void publishMessage(String message, String routingKey) throws IOException {

// basicPublish - ( exchange, routingKey, basicProperties, body)
System.out.println("[Send] [" + routingKey + "]: " + message);
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
}
}

Constant.java

package com.gpcoder.alternateexchange;

public final class Constant {

// Exchange

public static final String TOPIC_EXCHANGE_NAME = "GPCoder.AltTopicExchange";

public static final String ALTERNATE_EXCHANGE_NAME = "GPCoder.AltFanoutExchange";

// Queue

public static final String JAVA_QUEUE_NAME = "QJava";

public static final String ALL_QUEUE_NAME = "QAll";

public static final String UNKNOWN_QUEUE_NAME = "QUnknown";

// Routing key pattern

public static final String JAVA_ROUTING_KEY = "java.*.gpcoder.com";

public static final String GPCODER_ROUTING_KEY = "#.gpcoder.com";

// Message key

public static final String JAVA_MSG_KEY = "java.gpcoder.com";

private Constant() {
super();
}
}

AlternateExchangeProducer.java

package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.ALTERNATE_EXCHANGE_NAME;
import static com.gpcoder.alternateexchange.Constant.UNKNOWN_QUEUE_NAME;

public class AlternateExchangeProducer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, ALTERNATE_EXCHANGE_NAME);

// Create fanout exchange
channel.declareExchange(BuiltinExchangeType.FANOUT);

// Create queues
channel.declareQueues(UNKNOWN_QUEUE_NAME);

channel.performQueueBinding(UNKNOWN_QUEUE_NAME, "");
}
}

TopicExchangeProducer.java

package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.*;

public class TopicExchangeProducer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, TOPIC_EXCHANGE_NAME);

// Create topic exchange
channel.declareExchangeWithAlternativeExchagne(BuiltinExchangeType.TOPIC, ALTERNATIVE_EXCHANGE_NAME);

// Create queues
channel.declareQueues(JAVA_QUEUE_NAME, ALL_QUEUE_NAME);

// Binding queues
channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY);
channel.performQueueBinding(ALL_QUEUE_NAME, GPCODER_ROUTING_KEY);
}

public void send(String message, String messageKey) throws IOException {
// Send message
channel.publishMessage(message, messageKey);
}
}

AlternateExchangeConsumer.java

package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.ALTERNATE_EXCHANGE_NAME;
import static com.gpcoder.alternateexchange.Constant.UNKNOWN_QUEUE_NAME;

public class AlternateExchangeConsumer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, ALTERNATE_EXCHANGE_NAME);

// Create fanout exchange
channel.declareExchange(BuiltinExchangeType.FANOUT);

// Create queues
channel.declareQueues(UNKNOWN_QUEUE_NAME);

channel.performQueueBinding(UNKNOWN_QUEUE_NAME, "");
}

public void subscribe() throws IOException {
// Subscribe message
channel.subscribeMessage(UNKNOWN_QUEUE_NAME);
}
}

TopicExchangeConsumer.java

package com.gpcoder.alternateexchange;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.*;

public class TopicExchangeConsumer {

private ExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new ExchangeChannel(connection, TOPIC_EXCHANGE_NAME);

// Create topic exchange
channel.declareExchangeWithAlternateExchagne(BuiltinExchangeType.TOPIC, ALTERNATE_EXCHANGE_NAME);

// Create queues
channel.declareQueues(JAVA_QUEUE_NAME, ALL_QUEUE_NAME);

// Binding queues
channel.performQueueBinding(JAVA_QUEUE_NAME, JAVA_ROUTING_KEY);
channel.performQueueBinding(ALL_QUEUE_NAME, GPCODER_ROUTING_KEY);
}

public void subscribe() throws IOException {
// Subscribe message
channel.subscribeMessage(JAVA_QUEUE_NAME);
channel.subscribeMessage(ALL_QUEUE_NAME);
}
}

App.java

package com.gpcoder.alternateexchange;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.alternateexchange.Constant.JAVA_MSG_KEY;

public class App {

public static void main(String[] args) throws IOException, TimeoutException {
AlternateExchangeProducer producer1 = new AlternateExchangeProducer();
producer1.start();

TopicExchangeProducer producer2 = new TopicExchangeProducer();
producer2.start();

// Publish some messages
producer2.send("[1] Head First Design Pattern", JAVA_MSG_KEY);
producer2.send("[2] Unknown Message", "random-gpcoder");

AlternateExchangeConsumer consumer1 = new AlternateExchangeConsumer();
consumer1.start();
consumer1.subscribe();

TopicExchangeConsumer consumer2 = new TopicExchangeConsumer();
consumer2.start();
consumer2.subscribe();
}
}

Output chương trình:

[Send] [java.gpcoder.com]: [1] Head First Design Pattern
[Send] [random-gpcoder]: [2] Unknown Message
[Received] [QUnknown]: amq.ctag-qe5VOGnCFLq8_Qu_ajUo_g
[Received] [QUnknown]: [2] Unknown Message
[Received] [QAll]: amq.ctag-Nt0tVcCjOXEr-BTF20roCw
[Received] [QAll]: [1] Head First Design Pattern

Như bạn thấy, Message thứ 2 không match với bất kỳ routing key nào, nên được chuyển xuống Queue QUnknown của Alternate Exchange (GPCoder.AltFanoutExchange).

Tài liệu tham khảo:

Bài viết gốc được đăng tải tại gpcoder.com

Có thể bạn quan tâm:

Xem thêm Việc làm Developer hấp dẫn trên TopDev