Producer – Consumer và đồng bộ hóa các luồng trong Java

Bài viết được sự cho phép của tác giả Giang Phan

Producer/ Consumer là một ví dụ kinh điển về vấn đề đồng hóa các luồng (multi-threading synchronization). Trong bài này tôi sẽ giới thiệu với các bạn vấn đề này và cách giải quyết để giúp các bạn hiểu rõ hơn về Java concurrency và mutli-threading.

Xem thêm các việc làm Java lương cao trên TopDev

Mô tả vấn đề Producer/ Consumer

Vấn đề mô tả hai đối tượng nhà sản xuất (Producer) và người tiêu dùng (Consumer), cả hai cùng chia sẻ một bộ đệm có kích thước cố định được sử dụng như một hàng đợi (queue).

Producer: công việc của nhà sản xuất là tạo dữ liệu, đưa nó vào bộ đệm và bắt đầu lại.

Consumer: công việc người tiêu dùng là tiêu thụ dữ liệu (nghĩa là loại bỏ nó khỏi bộ đệm), từng phần một và xử lý nó. Consumer và Producer hoạt động song song với nhau.

Vấn đề là đảm bảo rằng nhà sản xuất không thể thêm dữ liệu vào bộ đệm nếu nó đầy và người tiêu dùng không thể xóa dữ liệu khỏi bộ đệm trống, đồng thời đảm bảo an toàn cho luồng (thread-safe).

Giải pháp để giải quyết vấn đề Producer/ Consumer

Ý tưởng:

  • Giải pháp cho nhà sản xuất là đi ngủ (wait) nếu bộ đệm đầy. Lần tiếp theo người tiêu dùng xóa một mục khỏi bộ đệm, nó đánh thức (notify) nhà sản xuất, bắt đầu đưa dữ liệu vào bộ đệm.
  • Theo cách tương tự, người tiêu dùng có thể đi ngủ (wait) nếu thấy bộ đệm trống. Lần tiếp theo nhà sản xuất đưa dữ liệu vào bộ đệm, nó đánh thức (notify) người tiêu dùng đang ngủ (wait).
  • Trong khi làm tất cả điều này, phải đảm bảo an toàn cho luồng (thread safe).

Có nhiều giải pháp cho vấn đề Producer Consumer trong Java:

  • Sử dụng synchronized.
  • Sử dụng BlockingQueue.
  • Sử dụng Semaphore.
  • Sử dụng JMS (Java Messaging Service): JMS là một implementation của vấn đề Producer Consumer. Nhiều nhà sản xuất và nhiều người tiêu dùng có thể kết nối với JMS và phân phối công việc. Tôi sẽ giới thiệu với các bạn JMS ở một bài viết khác.

Trong bài này, tôi sẽ giới thiệu với các bạn cách sử dụng BlockingQueue và Semaphore.

Sử dụng BlockingQueue

Để giải quyết vấn đề, chúng ta sẽ cần 3 class:

  • Blocking Queue :
    • Sử dụng synchronized để đảm bảo thread-safe.
    • put() : sử dụng wait() để chờ khi queue đã đầy (full), và notifyAll() để thông báo khi thêm data mới vào queue.
    • take() : sử dụng wait() để chờ khi queue rỗng (empty), và notifyAll() để thông báo khi lấy data ra khỏi queue.
  • Producer Thread(s) : các thread mô phỏng các nhà sản xuất.
  • Consumer Thread(s) : các thread mô phỏng các người tiêu dùng.

Các bạn có thể sử dụng BlockingQueue có sẵn trong package java.util.concurrent . Trong bài này, tôi sẽ tự tạo một BlockingQueue để mô phỏng cơ chế hoạt động của Blocking Queue cho các bạn dễ hiểu.

BlockingQueue.java

package com.gpcoder.producerconsumer;

import java.util.LinkedList;

public class BlockingQueue {
private static final int compacity = 10;
private final LinkedList items = new LinkedList<>();

public synchronized void put(T value) throws InterruptedException {
while (items.size() == compacity) {
System.out.println("Queue is full");
wait();
}
items.addLast(value);
notifyAll();
}

public synchronized T take() throws InterruptedException {
while (items.size() == 0) {
System.out.println("Queue is empty");
wait();
}
notifyAll();
return items.removeFirst();
}

public synchronized int size() {
return items.size();
}
}

Producer.java

package com.gpcoder.producerconsumer;

import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable {

private final BlockingQueue queue;

Producer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
while (true) {
queue.put(produce());
System.out.println("Produced resource - Queue size() = " + queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private Integer produce() throws InterruptedException {
Thread.sleep(50); // simulate time to produce the data
return ThreadLocalRandom.current().nextInt(1, 100);
}
}

Consumer.java

package com.gpcoder.producerconsumer;

import java.util.concurrent.ThreadLocalRandom;

public class Consumer implements Runnable {

private final BlockingQueue queue;

Consumer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
while (true) {
queue.take();
System.out.println("Consumed resource - Queue size() = " + queue.size());
Thread.sleep(ThreadLocalRandom.current().nextInt(50, 300)); // simulate time passing
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Main.java

package com.gpcoder.producerconsumer;

public class Main {

public static void main(String[] args) throws InterruptedException {
BlockingQueue boundedBuffer = new BlockingQueue<>();

Producer producer = new Producer(boundedBuffer);
Consumer consumer1 = new Consumer(boundedBuffer);
Consumer consumer2 = new Consumer(boundedBuffer);
Consumer consumer3 = new Consumer(boundedBuffer);

new Thread(producer).start();
new Thread(consumer1).start();
new Thread(consumer2).start();
new Thread(consumer3).start();

Thread.sleep(5000); // After 5s have another comsumer
Consumer consumer4 = new Consumer(boundedBuffer);
new Thread(consumer4).start();
}
}

Output chương trình:

Queue is empty
Queue is empty
Queue is empty
Queue is empty
Produced resource - Queue size() = 0
Consumed resource - Queue size() = 0
Queue is empty
Produced resource - Queue size() = 0
Consumed resource - Queue size() = 0
Produced resource - Queue size() = 1
Consumed resource - Queue size() = 0
Queue is empty
Produced resource - Queue size() = 1
Consumed resource - Queue size() = 0
....
Produced resource - Queue size() = 10
Queue is full
Consumed resource - Queue size() = 9
Produced resource - Queue size() = 10
Queue is full
Consumed resource - Queue size() = 9
Produced resource - Queue size() = 10
Consumed resource - Queue size() = 9
....
Produced resource - Queue size() = 10
Consumed resource - Queue size() = 9
Consumed resource - Queue size() = 8
Produced resource - Queue size() = 9
Consumed resource - Queue size() = 8
...
Consumed resource - Queue size() = 1
Produced resource - Queue size() = 2
Consumed resource - Queue size() = 1
Consumed resource - Queue size() = 0
Queue is empty
Produced resource - Queue size() = 1
Consumed resource - Queue size() = 0
...
  10 lý do cho thấy tại sao bạn nên theo học ngôn ngữ lập trình Java
  10 tips để trở thành Java Developer xịn hơn

Sử dụng Semaphore

Tương tự như BlockingQueue ở trên, với Semaphore chúng ta cũng cần 3 class:

  • Queue : sử dụng 2 Semaphore để đồng bộ hóa dữ liệu.
    • Semaphore Producer: được set giá trị là maximum của buffer size. Giá trị này tương ứng với số lượng item có thể được thêm vào buffer.
    • Semaphore Consumer: được set giá trị là 0. Giá trị này tương ứng với số lượng item có thể được lấy ra khỏi buffer.
  • Producer Thread(s) : các thread mô phỏng các nhà sản xuất.
  • Consumer Thread(s) : các thread mô phỏng các người tiêu dùng.

Ví dụ:

Để dễ theo dõi, tôi sẽ thêm một số dòng log để xem cách hoạt động của Semaphore.

package com.gpcoder.producerconsumer.semaphore;

import java.util.Stack;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;

public class ProducerConsumerSemaphore {

private static final int BUFFER_SIZE = 4;
private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
private final Semaphore readPermits = new Semaphore(0);
private final Stack buffer = new Stack<>();

class Producer implements Runnable {
private String name;

public Producer(String name) {
this.name = name;
}

@Override
public void run() {
try {
while (true) {
System.out.println(name + ": acquiring lock...");
System.out.println(name + ": Producer available Semaphore permits now: " + writePermits.availablePermits());
writePermits.acquire();
System.out.println(name + ": got the permit!");

Thread.sleep(50); // simulate time to work
int data = ThreadLocalRandom.current().nextInt(100);
System.out.println(name + ": produced data " + buffer.push(data));

System.out.println(name + ": releasing lock...");
readPermits.release();
System.out.println(name + ": Consumer available Semaphore permits now: " + readPermits.availablePermits());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Consumer implements Runnable {
private String name;

public Consumer(String name) {
this.name = name;
}

@Override
public void run() {
try {
while (true) {
System.out.println(name + ": acquiring lock...");
System.out.println(name + ": Consumer available Semaphore permits now: " + readPermits.availablePermits());
readPermits.acquire();

Thread.sleep(ThreadLocalRandom.current().nextInt(50, 300)); // simulate time to work
System.out.println(name + ": consumed data " + buffer.pop());

System.out.println(name + ": releasing lock...");
writePermits.release();
System.out.println(name + ": Producer available Semaphore permits now: " + writePermits.availablePermits());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
Producer producer = obj.new Producer("Producer 1");
new Thread(producer).start();

for (int i = 1; i <= 3; i++) {
Consumer consumer = obj.new Consumer("Consumer " + i);
new Thread(consumer).start();
}

Thread.sleep(5000); // After 5s have another comsumer
Consumer consumer = obj.new Consumer("Consumer " + 4);
new Thread(consumer).start();
}
}

Chạy chương trình trên, ta có kết quả sau:

Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 4
Producer 1: got the permit!
Consumer 1: acquiring lock...
Consumer 1: Consumer available Semaphore permits now: 0
Consumer 2: acquiring lock...
Consumer 2: Consumer available Semaphore permits now: 0
Consumer 3: acquiring lock...
Consumer 3: Consumer available Semaphore permits now: 0
Producer 1: produced data 94
Producer 1: releasing lock...
Producer 1: Consumer available Semaphore permits now: 0
Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 3
Producer 1: got the permit!
Producer 1: produced data 85
Producer 1: releasing lock...
Producer 1: Consumer available Semaphore permits now: 1
Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 2
...
Consumer 2: Producer available Semaphore permits now: 1
Consumer 2: acquiring lock...
Consumer 2: Consumer available Semaphore permits now: 0
Producer 1: produced data 9
Producer 1: releasing lock...
Producer 1: Consumer available Semaphore permits now: 1
Producer 1: acquiring lock...
Producer 1: Producer available Semaphore permits now: 1
Producer 1: got the permit!
...

Tại sao vấn đề Producer/ Consumer lại quan trọng?

Có thể được sử dụng để phân phối công việc giữa các worker khác nhau, dễ dàng tăng hoặc giảm theo yêu cầu. Như bạn thấy trong ví dụ trên, ban đầu một Producer có thể sản xuất đủ cho 3 Consumer. Nhưng sau đó, có thêm một Consumer thì hệ thống sản xuất không đáp ứng được, do đó chúng ta cần tăng hiệu suất của Producer lên hoặc tạo thêm một Producer khác để đủ phục vụ cho Consumer, tránh tình trạng thiếu sản phẩm.

Producer và Consumer được kết nối thông qua BlockingQueue, nó không biết sự hiện diện của nhau, tách rời các mối quan tâm (separation of concern), giúp hệ thống thống có thiết kế tốt hơn, nên dễ dàng nâng cấp và mở rộng.

Producer và Consumer không cần phải có sẵn cùng một lúc. Consumer có thể nhận các nhiệm vụ được sản xuất bởi Producer tại một thời điểm khác nhau.

Tài liệu tham khảo:

Bài viết gốc được đăng tải tại gpcoder.com

Có thể bạn quan tâm:

Xem thêm tuyển dụng IT hấp dẫn trên TopDev