Kết nối AMQP Client với RabbitMQ Server
Bài viết được sự cho phép của tác giả Giang Phan
Ở các bài viết trước chúng ta đã cùng tìm hiểu về một số khái niệm cơ bản của RabbitMQ và cách cài đặt RabbmitMQ Server. Trong bài này, chúng ta sẽ cùng tìm hiểu cách tạo RabbitMQ Client (Producer và Consumer) sử dụng AMQP library để kết nối đến RabbitMQ server, đồng thời cũng giới thiệu với các bạn cách hoạt động của Default Exchange trong RabbitMQ.
Tạo RabbitMQ project
Tạo maven project và mở file pom.xml, khai báo dependency như sau:
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
project
xmlns
=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<
modelVersion
>4.0.0</
modelVersion
>
<
groupId
>com.gpcoder</
groupId
>
<
artifactId
>rabbitmq-example</
artifactId
>
<
version
>1.0-SNAPSHOT</
version
>
<
properties
>
<
java.version
>1.8</
java.version
>
<
maven.compiler.target
>1.8</
maven.compiler.target
>
<
maven.compiler.source
>1.8</
maven.compiler.source
>
</
properties
>
<
dependencies
>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<
dependency
>
<
groupId
>com.rabbitmq</
groupId
>
<
artifactId
>amqp-client</
artifactId
>
<
version
>5.8.0</
version
>
</
dependency
>
</
dependencies
>
<
build
>
<
plugins
>
<!-- https://mvnrepository.com/artifact/org.apache.maven.plugins/maven-compiler-plugin -->
<
plugin
>
<
groupId
>org.apache.maven.plugins</
groupId
>
<
artifactId
>maven-compiler-plugin</
artifactId
>
<
version
>3.8.1</
version
>
<
configuration
>
<
source
>1.8</
source
>
<
target
>1.8</
target
>
</
configuration
>
</
plugin
>
<!-- include all the dependencies into the jar for easier to execute the application -->
<!-- https://mvnrepository.com/artifact/org.fusesource.mvnplugins/maven-uberize-plugin -->
<
plugin
>
<
groupId
>org.fusesource.mvnplugins</
groupId
>
<
artifactId
>maven-uberize-plugin</
artifactId
>
<
version
>1.45</
version
>
<
executions
>
<
execution
>
<
phase
>package</
phase
>
<
goals
>
<
goal
>uberize</
goal
>
</
goals
>
</
execution
>
</
executions
>
</
plugin
>
</
plugins
>
</
build
>
</
project
>
Tạo Producer và Consumer
Tạo Producer
Các bước thực hiện:
- Tạo ConnectionFactory : xác định remote URI đến RabbitMQ server. Cần cung cấp một số thông tin như: Virtual Host, Host, Port, Username, Password. Nếu chỉ chạy ở lcoalhost và sử dụng các thông số mặc định của RabbitMQ, thì chỉ cần khai báo Host là “localhost”.
- Tạo Connection từ ConnectionFactory. Đây là một kết nối TCP giữa ứng dụng và RabbitMQ broker.
- Tạo Channel: tạo một kết nối ảo trong một Connection. Việc publishing hoặc consuming message từ một queue đều được thực hiện trên channel. Mỗi Connection có thể có nhiều Channel quản lý những thứ như Exchange và Message riêng biệt.
- Tạo Exchange: Là nơi nhận message được publish từ Producer và đẩy chúng vào queue dựa vào quy tắc của từng loại Exchange. Để nhận được message, queue phải được nằm (binding) trong ít nhất 1 Exchange. Tôi sẽ giới thiệu cách sử dụng Exchange ở bài viết khác.
- Tạo Queue: Tạo queue để lưu trữ các Message.
- Tạo Message: Nội dụng tin nhắn sẽ được gởi đến client. Thông tin này được truyền từ Producer đến Consumer qua RabbitMQ.
- Gửi Message: thực hiện gửi message đến Exchange thông qua phương thức channel.basicPublish(). Sau khi nhận message, Exchange sử dụng sẽ định tuyến (route) Message đến Queue.
- Đóng kết nối: sau khi sử dụng xong cần đóng kết nối, để giải phóng các tài nguyên được sử dụng cho kết nối đó.
Ví dụ:
package
com.gpcoder;
import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.Connection;
import
com.rabbitmq.client.ConnectionFactory;
import
java.io.BufferedReader;
import
java.io.InputStreamReader;
public
class
Producer {
private
final
static
String QUEUE_NAME =
"gpcoder-queue"
;
public
static
void
main(String[] argv)
throws
Exception {
System.out.println(
"Create a ConnectionFactory"
);
ConnectionFactory factory =
new
ConnectionFactory();
// factory.setVirtualHost("/");
factory.setHost(
"localhost"
);
// factory.setPort(5672);
// factory.setUsername("guest");
// factory.setPassword("guest");
System.out.println(
"Create a Connection"
);
System.out.println(
"Create a Channel"
);
try
( Connection connection = factory.newConnection();
Channel channel = connection.createChannel() ) {
System.out.println(
"Create a queue "
+ QUEUE_NAME);
channel.queueDeclare(QUEUE_NAME,
false
,
false
,
false
,
null
);
System.out.println(
"Start sending messages ... "
);
try
(BufferedReader br =
new
BufferedReader(
new
InputStreamReader(System.in));) {
String message;
do
{
System.out.print(
"Enter message: "
);
message = br.readLine().trim();
channel.basicPublish(
""
, QUEUE_NAME,
null
, message.getBytes());
System.out.println(
" [x] Sent: '"
+ message +
"'"
);
}
while
(!message.equalsIgnoreCase(
"close"
));
}
}
finally
{
System.out.println(
"Close connection and free resources"
);
}
}
}
Một số lưu ý về chương trình trên:
- factory.setHost(“localhost”) : chúng ta chỉ cần khai báo như thế này là đủ để kết nối đến RabbitMQ Server. Những tham số còn lại, RabbitMQ sẽ lấy những giá trị mặc định. Nếu bạn deploy ứng dụng lên server có thể khai báo thêm các tham số khác như đoạn code mình đã comment lại trong chương trình trên.
- channel.queueDeclare() : tạo một Queue để lưu trữ các Message. Một số đối số của phương thức này:
- String name: tên của Queue muốn gửi message đến.
- boolean durable : queue và message sẽ bị xoá khi RabbitMQ server stop, để giữ lại chúng ta cần set giá trị này là true.
- boolean exclusive : queue sẽ bị xoá khi Connection close.
- boolean autoDelete : Queue tự động bị xóa khi Consumer cuối cùng của nó bị hủy hoặc đóng hoặc mất kết nối TCP với máy chủ.
- Map<String, Object> arguments : các tham số nếu có.
- channel.basicPublish() : gửi message lên Exchange. Một số đối số của phương thức này:
- String exchange : tên của Exchange. Trong bài này, tôi sẽ không xác định Exchange Name, để giá trị nó là empty. Đây là loại Default Exchange. Default Exchange được liên kết ngầm định với mọi Queue với khóa định tuyến bằng với tên Queue.
- String routingKey : key của Exchange.
- BasicProperties props : các thuộc tính khác cho message.
- byte[] body : nội dung của tin nhắn.
Tạo Consumer
Các bước thực hiện tương tự như tạo Producer, khác biệt duy nhất là thay vì tạo Producer để gửi tin nhắn, ta tạo Consumer để nhận tin nhắn.
- channel.queueDeclare() : tương tự như Producer.
- channel.basicConsume() : nhận Message từ Queue. Một số đối số của phương thức này:
- String queue : Tên của Queue muốn nhận message từ.
- boolean autoAck : tự động gửi một ACK message đến RabbitMQ để báo với RabbitMQ rằng một Message đã được consumer nhận, xử lý và Rabbit có thể xoá nó.
- DeliverCallback deliverCallback : cung cấp một call back để xử lý khi một message đến.
- CancelCallback cancelCallback : cung cấp một call back để xử lý khi một consumer bị cancel vì một lý do nào đó, chẳng hạn Queue bị cancel.
package
com.gpcoder;
import
com.rabbitmq.client.*;
public
class
Consumer {
private
final
static
String QUEUE_NAME =
"gpcoder-queue"
;
public
static
void
main(String[] argv)
throws
Exception {
System.out.println(
"Create a ConnectionFactory"
);
ConnectionFactory factory =
new
ConnectionFactory();
factory.setHost(
"localhost"
);
System.out.println(
"Create a Connection"
);
System.out.println(
"Create a Channel"
);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println(
"Create a queue "
+ QUEUE_NAME);
channel.queueDeclare(QUEUE_NAME,
false
,
false
,
false
,
null
);
System.out.println(
" [*] Waiting for messages. To exit press CTRL+C"
);
System.out.println(
"Start receiving messages ... "
);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message =
new
String(delivery.getBody(),
"UTF-8"
);
System.out.println(
" [x] Received: '"
+ message +
"'"
);
};
CancelCallback cancelCallback = consumerTag -> { };
String consumerTag = channel.basicConsume(QUEUE_NAME,
true
, deliverCallback, cancelCallback);
System.out.println(
"consumerTag: "
+ consumerTag);
}
}
Lưu ý rằng tại sao Consumer lại cần declare một Queue. Vì chúng ta có thể start một Consumer trước một Producer, chúng ta muốn đảm bảo Queue tồn tại trước khi chúng ta cố gắng consume message từ nó.
Tại sao chúng ta không sử dụng câu lệnh try-with-resource để tự động close Connection và Channel? Bằng cách làm như vậy, chúng ta chỉ đơn giản sẽ làm cho chương trình tiếp tục, đóng mọi thứ và thoát. Điều này sẽ gây khó khăn vì chúng ta muốn quá trình này tồn tại trong khi Consumer đang chờ tin nhắn bất động bộ đến.
Chạy ứng dụng
Trước hết, chúng ta cần package ứng dụng trên thành gói jar, chạy lệnh: mvn clean install
Sau khi chạy lệnh trên, trong thư mục target của project, chúng ta có gói: rabbitmq-example-1.0-SNAPSHOT.jar
Start RabbitMQ Server: xem lại bài viết trước “Cài đặt RabbitMQ“.
Nhiều Consumer và một Producer
Mở 2 console và chạy lệnh sau để start 2 Consumer:
java -cp target/rabbitmq-example-1.0-SNAPSHOT.jar com.gpcoder.Consumer
Mở thêm 1 console khác để start 1 Producer:
java -cp target/rabbitmq-example-1.0-SNAPSHOT.jar com.gpcoder.Producer
Chúng ta có kết quả như sau:
Nhập một vài giá trị ở cửa sổ Producer, chúng ta có kết quả sau:
Như bạn thấy, một tin nhắn chỉ được gửi cho một client tại một thời điểm và client thay phiên nhau nhận tin nhắn (Round-robin).
Hãy đóng các console trên bằng cách nhấn tổ hợp phím Ctrl + C.
Producer gửi message trước khi Consumer start
Tiếp tục hãy test thử một trường hợp khác: Start Producer và gửi một vài message:
Sau đó start Consumer và check kết quả.
Bạn có thể thấy rằng, người nhận không cần active tại thời điểm Producer gửi message. RabbitMQ Broker sẽ deliver message ngay khi Consumer active.
Kiểm tra request trên RabbitMQ Management Interface
Start Producer và gửi một vài message.
Mở admin page: http://localhost:15672/#/exchanges
Chọn AMQP default, bạn sẽ thấy message rates được gửi trong một phút qua ở AMQP Default Exchange (do chúng ta không xác định tên của exchange ở chương trình trên, nên mặc định Message sẽ được gửi thông quan Default Exchange).
Chọn menu Queue, bạn sẽ thấy thông tin của Mesages Ready để gửi cho Consumer.
Tài liệu tham khảo:
- https://www.rabbitmq.com/jms-client.html
- https://www.rabbitmq.com/api-guide.html
- https://www.rabbitmq.com/queues.html
- https://www.rabbitmq.com/tutorials/tutorial-one-java.html
Bài viết gốc được đăng tải tại gpcoder.com
Có thể bạn quan tâm:
- Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud
- Kết nối RabbitMQ sử dụng Web STOMP Plugin
- [Python cơ bản thường dùng trong công việc] Phần 12 : Kết nối RabbitMQ
Xem thêm Việc làm IT hấp dẫn trên TopDev
- B BenQ RD Series – Dòng Màn Hình Lập Trình 4k+ Đầu Tiên Trên Thế Giới
- i iOS 18 có gì mới? Có nên cập nhật iOS 18 cho iPhone của bạn?
- G Gamma AI là gì? Cách tạo slide chuyên nghiệp chỉ trong vài phút
- P Power BI là gì? Vì sao doanh nghiệp nên sử dụng PBI?
- K KICC HCMC x TOPDEV – Bước đệm nâng tầm sự nghiệp cho nhân tài IT Việt Nam
- T Trello là gì? Cách sử dụng Trello để quản lý công việc
- T TOP 10 SỰ KIỆN CÔNG NGHỆ THƯỜNG NIÊN KHÔNG NÊN BỎ LỠ
- T Tìm hiểu Laptop AI – So sánh Laptop AI với Laptop thường
- M MySQL vs MS SQL Server: Phân biệt hai RDBMS phổ biến nhất
- S SearchGPT là gì? Công cụ tìm kiếm mới có thể đánh bại Google?