Java 多線程基礎(十二)生產者與消費者
一、生產者與消費者模型
生產者與消費者問題是個非常典型的多線程問題,涉及到的對象包括“生產者”、“消費者”、“倉庫”和“產品”。他們之間的關係如下:
①、生產者僅僅在倉儲未滿時候生產,倉滿則停止生產。
②、消費者僅僅在倉儲有產品時候才能消費,倉空則等待。
③、當消費者發現倉儲沒產品可消費時候會通知生產者生產。
④、生產者在生產出可消費產品時候,應該通知等待的消費者去消費。
生產者消費者模型具體來講,就是在一個系統中,存在生產者和消費者兩種角色,他們通過內存緩衝區進行通信,生產者生產消費者需要的資料,消費者把資料做成產品。生產消費者模式如下圖。
在日益發展的服務類型中,譬如註冊用戶這種服務,它可能解耦成好幾種獨立的服務(賬號驗證,郵箱驗證碼,手機短信碼等)。它們作為消費者,等待用戶輸入數據,在前台數據提交之後會經過分解併發送到各個服務所在的url,分發的那個角色就相當於生產者。消費者在獲取數據時候有可能一次不能處理完,那麼它們各自有一個請求隊列,那就是內存緩衝區了。做這項工作的框架叫做消息隊列。
二、生產者與消費者實現
下面通過生產包子的例子及wait()/notify()方式實現該模型(後面學習線程池相關內容之後,再通過其它方式實現生產/者消費者模型)。
麵包類:
public class Bread { private int capacity; // 麵包的容量 private int size; // 麵包的實際數量 public Bread(int capacity) { this.capacity = capacity; this.size = 0; } // 生產麵包 public synchronized void produce(int val) { try { // left 表示“想要生產的數量”(有可能生產量太多,需多此生產) int left = val; while (left > 0) { // 庫存已滿時,等待“消費者”消費產品。 while (size >= capacity) wait(); // 獲取“實際生產的數量”(即庫存中新增的數量) // 如果“庫存”+“想要生產的數量”>“總的容量”,則“實際增量”=“總的容量”-“當前容量”。(此時填滿倉庫) // 否則“實際增量”=“想要生產的數量” int inc = (size+left)>capacity ? (capacity-size) : left; size += inc; left -= inc; System.out.printf("%s produce(%3d) --> left=%3d, inc=%3d, size=%3d\n", Thread.currentThread().getName(), val, left, inc, size); // 通知“消費者”可以消費了。 notifyAll(); } }catch(InterruptedException e) { e.printStackTrace(); } } // 消費麵包 public synchronized void consume(int val) { try { // left 表示“客戶要消費數量”(有可能消費量太大,庫存不夠,需多此消費) int left = val; while (left > 0) { // 庫存為0時,等待“生產者”生產產品。 while (size <= 0) wait(); // 獲取“實際消費的數量”(即庫存中實際減少的數量) // 如果“庫存”<“客戶要消費的數量”,則“實際消費量”=“庫存”; // 否則,“實際消費量”=“客戶要消費的數量”。 int dec = (size<left) ? size : left; size -= dec; left -= dec; System.out.printf("%s consume(%3d) <-- left=%3d, dec=%3d, size=%3d\n", Thread.currentThread().getName(), val, left, dec, size); notifyAll(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
生產者類
public class Producer{ Bread bread; public Producer(Bread bread) { this.bread = bread; } public void produce(final int val) { new Thread(() -> { bread.produce(val); }).start();; } }
消費者類
public class Customer { private Bread bread; public Customer(Bread bread) { this.bread = bread; } public void consume(final int val) { new Thread(() -> { bread.consume(val); }).start();; } }
測試類代碼
public class Demo { public static void main(String[] args) { Bread bread = new Bread(100); Producer producer = new Producer(bread); Cunstomer customer = new Customer(bread); producer.produce(60); producer.produce(120); consumer.consume(90); consumer.consume(150); producer.produce(110); } }
// 運行結果 Thread-1 produce( 60) --> left= 0, inc= 60, size= 60 Thread-5 produce(110) --> left= 70, inc= 40, size=100 Thread-4 consume(150) <-- left= 50, dec=100, size= 0 Thread-2 produce(120) --> left= 20, inc=100, size=100 Thread-3 consume( 90) <-- left= 0, dec= 90, size= 10 Thread-4 consume(150) <-- left= 40, dec= 10, size= 0 Thread-5 produce(110) --> left= 0, inc= 70, size= 70 Thread-4 consume(150) <-- left= 0, dec= 40, size= 30 Thread-2 produce(120) --> left= 0, inc= 20, size= 50
說明:
①、Producer是“生產者”類,它與“麵包(bread)”關聯。當調用“生產者”的produce()方法時,它會新建一個線程並向“麵包類”中生產產品。
②、Customer是“消費者”類,它與“麵包(bread)”關聯。當調用“消費者”的consume()方法時,它會新建一個線程並消費“麵包類”中的產品。
③、Bread是麵包類,記錄“麵包的產量(capacity)”以及麵包當前實際數目(size)”。
麵包類的生產方法produce()和消費方法consume()方法都是synchronized方法,進入synchronized方法體,意味着這個線程獲取到了該“麵包”對象的同步鎖。這也就是說,同一時間,生產者和消費者線程只能有一個能運行。通過同步鎖,實現了對“殘酷”的互斥訪問。
對於生產方法 produce() 而言:當麵包量滿時,生產者線程等待,需要等待消費者消費產品之後,生產線程才能生產;生產者線程生產完麵包之後,會通過 notifyAll() 喚醒同步鎖上的所有線程,包括“消費者線程”,即我們所說的“通知消費者進行消費”。
對於消費方法consume()而言:當倉庫為空時,消費者線程等待,需要等待生產者生產產品之後,消費者線程才能消費;消費者線程消費完產品之後,會通過 notifyAll() 喚醒同步鎖上的所有線程,包括“生產者線程”,即我們所說的“通知生產者進行生產”。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※網頁設計公司推薦不同的風格,搶佔消費者視覺第一線
※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※南投搬家公司費用需注意的眉眉角角,別等搬了再說!
※新北清潔公司,居家、辦公、裝潢細清專業服務
※教你寫出一流的銷售文案?