Triển khai Saga Pattern trong microservices với NodeJS và Choreography-Based Saga

Bài viết được sự cho phép của tác giả Duy Phan

Mình sẽ sử dụng lại ví dụ Booking Service Online trong phần trước đó

Saga Pattern trong microservices

Ở đây mình sẽ tạo ra các isolated service, đồng thời thiết kế để chúng giao tiếp với nhau thông qua một Message Queue. Ở đây mình chọn RabbitMQ làm Message Queue.

1. Triển khai BookingService

// booking-service.ts
import express from 'express'
import amqp from 'amqplib'

const app = express()
const PORT = 3001

app.use(express.json())

let channel: amqp.Channel

const paymentQueue = 'payment_queue'

app.post('/booking', async (req, res) => {
  const { userId, eventId, numberOfSeats } = req.body

  // Pre-step 1: Validate booking request

  // Pre-step 2: Save booking request to application database
  const booking = {
    userId,
    eventId,
    numberOfSeats,
    bookingReservedSuccessfully: true,
  }

  /**
   * Step 1: Send Booking Request to PaymentService
   */
  if (booking.bookingReservedSuccessfully) {
    await sendMessageToQueue(paymentQueue, { booking })
  }

  res.json({ message: 'Booking request sent successfully' })
})

async function connectQueue(queue: string) {
  const connection = await amqp.connect('amqp://localhost')

  channel = await connection.createChannel()

  await channel.assertQueue(queue)
}

async function sendMessageToQueue(queue: string, message: unknown) {
  await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}

app.listen(PORT, async () => {
  console.log(`BookingService is running on http://localhost:${PORT}`)

  await connectQueue(paymentQueue)
})

BookingService xử lý các yêu cầu HTTP POST để tạo booking mới. BookingService cố gắng đặt trước một chỗ và nếu thành công, nó sẽ gửi một message đến PaymentService thông qua một queue trong RabbitMQ có tên là payment_queue.

2. Triển khai PaymentService

// booking-service.ts
import amqp from 'amqplib'

let channel: amqp.Channel

const paymentQueue = 'payment_queue'
const seatUpdatingQueue = 'seat_update_queue'
const compensationQueue = 'compensation_queue'

async function processPayment() {
  const connection = await amqp.connect('amqp://localhost')

  channel = await connection.createChannel()

  await channel.assertQueue(paymentQueue)
  await channel.assertQueue(seatUpdatingQueue)
  await channel.assertQueue(compensationQueue)

  channel.consume(paymentQueue, async (msg) => {
    const { booking } = JSON.parse(msg.content.toString())

    // Pre-step: Process payment
    const paymentProcessedSuccessfully = true

    /**
     * Step 2:
     * Send seats update request to SeatUpdatingService
     * Or Reverse the booking by sending a compensation request to CompensationService
     */
    if (paymentProcessedSuccessfully) {
      await sendMessageToQueue(seatUpdatingQueue, { booking })
    } else {
      await sendMessageToQueue(compensationQueue, {
        booking,
        event: 'PaymentServiceFailed',
      })
    }

    // Acknowledge the message
    channel.ack(msg as amqp.Message)
  })
}

async function sendMessageToQueue(queue: string, message: unknown) {
  await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}

processPayment()

PaymentService lắng nghe các message trên payment_queue. Nó chịu trách nhiệm xử lý payment, đồng thời nó cũng sẽ gửi một message đến seat_update_queue nếu thanh toán thành công hoặc compensation_queue nếu thanh toán thất bại.

  SAGA Pattern trong Microservices

  SAGA Pattern trong kiến trúc ngân hàng lõi (Core Bank Architecture)

3. Triển khai SeatUpdatingService

// seat-update-service.ts
import amqp from 'amqplib'

let channel: amqp.Channel

const seatUpdatingQueue = 'seat_update_queue'
const notificationQueue = 'notification_queue'
const compensationQueue = 'compensation_queue'

async function processSeatUpdate() {
  const connection = await amqp.connect('amqp://localhost')

  channel = await connection.createChannel()

  await channel.assertQueue(seatUpdatingQueue)

  channel.consume(seatUpdatingQueue, async (msg) => {
    const { booking } = JSON.parse(msg?.content.toString() || '{}')

    // Add your seat updating logic here
    const seatsUpdatedSuccessfully = true

    /**
     * Step 3:
     * Send seats update request to NotificationService
     * Or Reverse the booking by sending a compensation request to CompensationService
     */
    if (seatsUpdatedSuccessfully) {
      await sendMessageToQueue(notificationQueue, { booking, isSuccess: true })
    } else {
      await sendMessageToQueue(compensationQueue, {
        booking,
        event: 'SeatUpdatingServiceFailed',
      })
    }

    // Acknowledge the message
    channel.ack(msg as amqp.Message)
  })
}

async function sendMessageToQueue(queue: string, message: unknown) {
  await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}

processSeatUpdate()

SeatUpdatingService lắng nghe các message trên seat_update_queue. Nó chịu trách nhiệm xử lý cập nhật chỗ, đồng thời nó cũng sẽ gửi một message đến notification_queue nếu thực hiện thành công hoặc compensation_queue nếu thực hiện thất bại.

Tuyển dụng lập trình viên Frontend lương cao tại đây!

4. Triển khai CompensationService

// seat-update-service.ts
import amqp from 'amqplib'

let channel: amqp.Channel

const compensationQueue = 'compensation_queue'
const notificationQueue = 'notification_queue'

async function processCompensation() {
  const connection = await amqp.connect('amqp://localhost')

  channel = await connection.createChannel()

  await channel.assertQueue(compensationQueue)

  channel.consume(compensationQueue, async (msg) => {
    const { booking, event } = JSON.parse(msg?.content.toString() || '{}')

    console.log(`Compensating for user ${booking.userId} and event ${event}`)

    /**
     * Step 4: Do compensation
     */
    switch (event) {
      case 'PaymentServiceFailed':
        console.log(
          `Compensating PaymentServiceFailed for user ${booking.userId}`
        )
        // Add your compensation logic here
        break
      case 'SeatUpdatingServiceFailed':
        console.log(
          `Compensating SeatUpdatingServiceFailed for user ${booking.userId}`
        )
        // Add your compensation logic here
        break
    }

    // Send notification to user
    await sendMessageToQueue(notificationQueue, { booking, isSuccess: false })

    // Acknowledge the message
    channel.ack(msg as amqp.Message)
  })
}

async function sendMessageToQueue(queue: string, message: unknown) {
  await channel.sendToQueue(queue, Buffer.from(JSON.stringify(message)))
}

processCompensation()

CompensationService lắng nghe các message trên compensation_queue. Khi CompensationService nhận được một message cho biết có sự cố khi xử lý booking, nó sẽ thực hiện một compensation transaction để hủy booking và đưa dữ liệu về trạng thái ban đầu, giúp hệ thống nhất quán về dữ liệu.

5. Triển khai NotificationService

// seat-update-service.ts
import amqp from 'amqplib'

let channel: amqp.Channel

const notificationQueue = 'notification_queue'

async function processSendNotification() {
  const connection = await amqp.connect('amqp://localhost')

  channel = await connection.createChannel()

  await channel.assertQueue(notificationQueue)

  channel.consume(notificationQueue, async (msg) => {
    const { booking, isSuccess } = JSON.parse(msg?.content.toString() || '{}')

    /**
     * Step 4: Send notification to user
     */
    if (isSuccess) {
      console.log(`Booking SUCCESS sent to user ${booking.userId}`)
    } else {
      console.log(`Booking FAIL send to user ${booking.userId}`)
    }

    // Acknowledge the message
    channel.ack(msg as amqp.Message)
  })
}

processSendNotification()

NotificationService lắng nghe các message trên notification_queue. Khi nó nhận được một message, nó sẽ gửi một thông báo (có thể là email, số điện thoại,…) tuỳ theo yêu cầu kỹ thuật.

Như vậy, bạn đã triển khai thành công các microservices với mô hình Choreography-Based Saga. Mỗi service sẽ thực hiện nhiệm vụ của mình và giao tiếp với các service khác thông qua event.

Bài viết gốc được đăng tải tại duypt.dev

Xem thêm:

Xem thêm các vị trí IT Job hấp dẫn trên TopDev