Đôi chút về RabbitMQ

Bài viết được sự cho phép của tác giả Nguyễn Hữu Đồng

Cuộc sống thiệt là thú vị, đã code là phải có bug, muốn không có bug thì đừng code và đôi khi chúng ta cũng phải chấp nhận code là một tính năng và sống chung với nó. Lúc hệ thống của mình nhỏ, xoay đi quảnh lại mình cũng chỉ làm những việc mình làm hằng ngày, chạy task dể serve client, không có cơ hội nhìn ra xem người ta đang làm gì, xã hội phát triển như thế nào rồi, và may mắn thay mình được tạo cơ hội để bước ra, vẫy vùng trên con sông lớn, dù chưa biết mình sẽ đi về đâu nhưng thà một phút huy hoàng rồi chợ tắt còn hơn le lói cả trăm năm, huy hoàng với bản thân thôi cũng đáng giá rồi.

  Giới thiệu CloudAMQP – Một RabbitMQ server trên Cloud

  Đôi chút về RabbitMQ
Phần 12 : Kết nối RabbitMQ”]

Lan man quá, vào vấn đề chính nào, cái RabbitMQ là gì và tại sao chúng ta nên dùng nó ? Hiện nay ở cái thời mà hệ thống web app của chúng ta lớn lên từng ngày, phân chia thành nhiều node, mỗi node lo làm một việc riêng hoặc hỗ trợ lẫn nhau để xử lí yêu cầu từ người dùng, việc đó đòi phải chúng nó phải chia sẻ được tài nguyên cho nhau và biết được mình đang được giao những gì và lấy danh sách những việc cần làm ở đâu.

Hay đơn giản với một công việc, sự kiện những mỗi thợ lại làm những việc khác nhau, ví dụ khi có chương trình mới sắp lên sóng, mọi người được thông báo và mỗi người lại làm việc theo đúng phần việc của mình.

Áp dụng vào hệ thống web, khi có một yêu cầu được gửi tới bảng thông báo, node thì truy xuất trong database lấy giữ liệu để chuẩn bị gửi cho khách hàng, node thì có nhiệm vụ ghi chú lại lại hôm nay hệ thống đã làm những gì. Mỗi node một việc, nó có thể nhận thông báo từ bảng thông báo hay lại làm nhiệm vụ của một phát thanh viên, thông báo việc đến cho các node còn lại.

Cái thứ đó cái hệ thống thông báo đó bao gồm cả các node, ngừời ta gọi là “message broker system”. Hiểu rõ hơn là hệ thống giao tiếp bằng tin nhắn giữa các hệ thống.

Như trên hình các bạn thấy, hệ thống của chúng ta gồm 4 thành phần “Producers”, “Exchanges”, “Queue” và “Consumer”. Theo hình sẽ thì ta thấy mô hình hoạt động khá đơn giản. Sẽ có một node nào đó tạo ra một tin nhắn, sau đó node đó sẽ gửi tin nhắn đó tới một trong số các Exchange sau đó Exchange sẽ phân phối tin nhắn đó tới các hàng chờ tin nhắn và sau đó, Consumer sẽ nhận tin nhắn từ hàng chờ đó và xử lí nó.

Áp dụng vào thực tế ngoài đời sống, ví dụ bạn là một nhà máy sản xuất bia, và tất nhiên bạn cần khách hàng tiêu thụ bia, bạn không thể gửi trực tiếp sản phẩm của bạn tới từng khách hàng được vì việc này rất tốn chi phí và bạn sẽ phải làm quá nhiều việc dẫn đến việc gì cũng biết mà không biết làm một việc thật tốt. Bạn cần các nhà phân phối bia và các nhà phân phối bia này cũng có thêm một vài đại lí nhỏ và cũng tương tự nhà sản xuất bia, các nhà phân phối này chỉ làm 1 việc là đi phân phối bia tới các đại lí nhỏ, đại lí nhỏ lại phân phối tới các quán tạp hoá, quán nhậu và cuối cùng tới tay ngừời tiêu dùng. Ở đây nhà sản xuất bia tương ứng với Producer, các nhà phân phối tương ứng với các Exchange, các đại lí (các hệ thống bán lẻ) tương ứng với Queue và khách hàng tương ứng với Consumer.

Bạn gái kia xinh qúa trời luôn 😀

Khách hàng uống bia, bia ngon thì khen, bia giờ thì chê, có người còn trả lại bia đòi lại tiền, consumer cũng vậy nếu consumer làm xong việc mà nó nhận từ Queue nó sẽ báo với queue là việc đó xong rồi, cho việc khác còn không thì nó sẽ báo với queue đưa việc đó cho consumer khác làm.

Có nhiều loại Exchange và cũng có loại Queue dễ tính hay khó tính, Exchange sẽ dựa vào các queue đăng kí nhận hàng từ exchange đó và phân phố tin nhắn sao cho hợp lí nhất. Sơ qua thì có 6 loại exchange từ loại chuyên chạy xe đi hỏi từng đại lí có ai muốn lấy hàng hay không cho đến loại exchange giao gì đại lí cũng lấy.

Direct Exchange

Đây là loại Exchange được dùng phổ biến nhất, nó sẽ chuyển tin nhắn đến các queue dựa vào Routing key, đúng thì gửi không đúng thì thôi, không ai nhận thì thôi.

Default exchange

Đây là loại phóng khoáng nhất và vô danh( tức là không có tên à không nó có tên là Default exchange đấy chứ ….. ), tin nhắn sẽ được vận chuyển thẳng tới Queue có tên tùng với Routing key trên tin nhắn.

Topic Exchange

Cũng vận chuyển tin nhắn tới queue đựa vào Routing key nhưng nó khá rộng rãi khi dựa vào Wildcard để xem 2 routing key có match với nhau hay không, ví dụ, “dongden” và “dongden.dev” là 2 routing key match với nhau rồi, vậy thì tin nhắn sẽ được vận đưa tới queue đó.

Header Exchange

Theo cách này mỗi queue khi khai báo nhận tin nhắn từ exchange sẽ phải khi báo rõ mình sẽ nhận tin nhắn có phần header như thế nào, bao gồm những trường và từ khoá như thế nào, exchange sẽ dựa vào tin nhắn và các điều kiện đó mà phân phối tin nhắn.

Fanout Exchange

Đúng như nghĩa fanout exchange “thổi” tin nhắn đến cho mọi queue mà đăng kí nhận từ nó.

Sơ qua vài loại đó thôi nếu bạn muốn tìm hiểu sâu hơn thì vào đây nhé 😀

Lý thuyết suông thì chán lắm “show me the code”

Ở bài viết này mình sẽ làm một ví dụ đơn giản, mình sẽ viết một con server có cho người dùng upload ảnh và sau đó mình sẽ tạo ảnh thumbna và upload chúng lên google storage.

Ở đây mình dùng Golang với Framework Gin để handle việc nhận form và lưu trữ file xuống local.

App của mình sẽ có một Producer chuyên gửi tin nhắn tới Exchange và 2 consumer làm hai việc khác nhau là Resize ảnh và Upload ảnh trong queue lên Google Storage.

package publisher

import (
	"github.com/streadway/amqp"
	"os"
)

var conn *amqp.Connection
var C *amqp.Channel


func JoinNetWork(){
	conn,_ = amqp.Dial(os.Getenv("AMQP_URL"))
	C,_ = conn.Channel()
	C.ExchangeDeclare("ProcessImage",amqp.ExchangeHeaders,true,false,false,false,nil)
	C.ExchangeDeclare("UploadImage",amqp.ExchangeHeaders,true,false,false,false,nil)
}

Như các bạn trong hình, sau khi mình lưu xuống local mình có hai function để kêu thằng Publisher để message vào Queue để hai anh chàng Consumer kia làm việc.

Ở đây mình dùng 3 connection và dùng 3 channel trên 3 connection đó, dùng 2 Exchange khác nhau để cho nó rõ ràng, các bạn có thể dùng 1 exchange duy nhất nếu các bạn muốn.

Cùng xem hai Consumer cuả mình có gì nào.

package consumer

import (
	"encoding/json"
	mystorage "github.com/dongnguyenltqb/go-rabbit/storage"
	"github.com/dongnguyenltqb/go-rabbit/utils"
	"github.com/streadway/amqp"
	"os"
)

var C2 *amqp.Channel
var tasks_upload <- chan amqp.Delivery

func uploadWorker(pool <- chan amqp.Delivery){
	for{
		task := <- pool
		var object mystorage.ObjectAddress
		if err := json.Unmarshal(task.Body,&object); err != nil{
			task.Reject(true)
			utils.Logger("error",err)
			continue
		}
		finished := make(chan bool)
		go mystorage.UploadToGCloudStorage(object.FileName,finished)
		if result := <- finished ; result == true {
			task.Ack(false)
		} else {
			task.Reject(true)
		}
	}
}

func runUploadWorker(numWorker int){
	for i:=1;i<numWorker;i++{
		go uploadWorker(tasks_upload)
	}
}  

func InitUploadConsumer(){
	conn,err := amqp.Dial(os.Getenv("AMQP_URL"))
	C2,err = conn.Channel()
	if err!=nil {
		panic(err)
	}
	C2.QueueDeclare("UploadImageToGCloud",true,false,false,false,nil)
	C2.QueueBind("UploadImageToGCloud","","UploadImage",true, map[string]interface{}{
		"type":"image/jpeg",
		"job":"upload",
	})
	tasks_upload,_ = C2.Consume("UploadImageToGCloud","UploadImage",false,true,false,false,nil)
	go runUploadWorker(NumWorker)
}

Đầu tiên là upload_consumer, nó sẽ đăng kí lắng nghe trên UploadImage exchange và mỗi khi nào có tin nhắn mới nó sẽ dựa vào thông tin có trong tin nhắn đó để xử lí, nếu xử lí ko được thì nó lại lại và làm tiếp cái khác nếu có.

Tiếp theo là resize_consumer, cũng hoạt động tương tự chỉ khác là sau khi Resize xong nó lại gọi thằng Publisher ra nhờ Publiser gửi tin nhắn lên exchange rằng có cái gì đó mới cần dược Upload Consumer xử lí.

package consumer

import (
	"encoding/json"
	"fmt"
	"github.com/disintegration/imaging"
	mystorage "github.com/dongnguyenltqb/go-rabbit/storage"
	"github.com/dongnguyenltqb/go-rabbit/utils"
	"github.com/streadway/amqp"
	"os"
	"strings"
)

var C1 *amqp.Channel
var NumWorker int
func ResizeWorker(pool <-chan amqp.Delivery){
	for{
		task := <- pool
		fmt.Println(utils.ApplyStyle("bold","yellow","Resizing image"))
		var object mystorage.ObjectAddress
		if err := json.Unmarshal(task.Body,&object);err != nil {
			task.Reject(true)
			utils.Logger("error",err)
			continue
		}
		srcImage, err := imaging.Open("storage/image/"+object.FileName)
		if err != nil {
			task.Reject(true)
			utils.Logger("error",err)
			continue
		}
		dstImage200 := imaging.Resize(srcImage, 200, 0, imaging.Lanczos)
		resizedFileName := "200@"+strings.TrimLeft(object.FileName,"/image/")
		err = imaging.Save(dstImage200,"storage/image/"+resizedFileName)
		if err != nil {
			task.Reject(true)
			utils.Logger("error",err)
			continue
		}
		task.Ack(false)
		resizedObject := mystorage.ObjectAddress{
			FileName: resizedFileName,
		}
		go mystorage.PushTaskToExchangeUploadImage(resizedObject)
	}
}

func runResizeWorker(numWorker int){
	for i:=0;i<=numWorker;i++{
		go ResizeWorker(tasks_resize)
	}
}

var tasks_resize <- chan amqp.Delivery

func InitResizeConsumer(){
	conn,err := amqp.Dial(os.Getenv("AMQP_URL"))
	C1,err = conn.Channel()
	if err!=nil {
		panic(err)
	}
	C1.QueueDeclare("ResizeImage",true,false,false,false,nil)
	C1.QueueBind("ResizeImage","","ProcessImage",true, map[string]interface{}{
		"type":"image/jpeg",
		"job":"resize",
	})
	tasks_resize,_ = C1.Consume("ResizeImage","ResizeImageWorker",false,true,false,false,nil)
	go runResizeWorker(NumWorker)
}

Về cơ bản App sau này mình có thể clone app của mình ra thành nhiều thành phần nhỏ khác nhau và làm mỗi một việc như “ Handle Upload Form”, “Upload” và “Resize” chúng nó có thể giao tiếp với nhau dễ dàng qua RabbitMQ.

Các bạn có thể clone source code của mình ở đây và bình luận giúp mình chỗ nào cần cải thiện, nếu hay các bạn vỗ tay cho mình nhé.

12h11 đêm rồi, mình chắc cũng nên dừng bút tại đây, các ơn các bạn đã đọc bài. Hẹn gặp lại trong bài viết sau.

Bài viết gốc được đăng tải tại dongnguyenltqb.medium.com

Có thể bạn quan tâm:

Xem thêm Việc làm Developer hấp dẫn trên TopDev