更新時(shí)間:2022-08-24 10:26:36 來(lái)源:動(dòng)力節(jié)點(diǎn) 瀏覽3971次
在Java多線程應(yīng)用中,隊(duì)列的使用率很高,多數(shù)生產(chǎn)消費(fèi)模型的首選數(shù)據(jù)結(jié)構(gòu)就是隊(duì)列。Java提供的線程安全的Queue可以分為阻塞隊(duì)列和非阻塞隊(duì)列,其中阻塞隊(duì)列的典型例子是BlockingQueue,非阻塞隊(duì)列的典型例子是ConcurrentLinkedQueue,在實(shí)際應(yīng)用中要根據(jù)實(shí)際需要選用阻塞隊(duì)列或者非阻塞隊(duì)列。
注:什么叫線程安全?這個(gè)首先要明確。線程安全的類 ,指的是類內(nèi)共享的全局變量的訪問(wèn)必須保證是不受多線程形式影響的。如果由于多線程的訪問(wèn)(比如修改、遍歷、查看)而使這些變量結(jié)構(gòu)被破壞或者針對(duì)這些變量操作的原子性被破壞,則這個(gè)類就不是線程安全的。
本文來(lái)講講這兩種Queue,本文分為以下兩個(gè)部分,用分割線分開(kāi):
BlockingQueue 阻塞算法
ConcurrentLinkedQueue,非阻塞算法
首先來(lái)看看BlockingQueue:
Queue是什么就不需要多說(shuō)了吧,一句話:隊(duì)列是先進(jìn)先出。相對(duì)的,棧是后進(jìn)先出。如果不熟悉的話先找本基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu)的書看看吧。
BlockingQueue,顧名思義,“阻塞隊(duì)列”:可以提供阻塞功能的隊(duì)列。
首先,看看BlockingQueue提供的常用方法:
| 可能報(bào)異常 | 返回布爾值 | 可能阻塞 | 設(shè)定等待時(shí)間 | |
|---|---|---|---|---|
| 入隊(duì) | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
| 出隊(duì) | remove() | poll() | take() | poll(timeout, unit) |
| 查看 | element() | peek() | 無(wú) | 無(wú) |
從上表可以很明顯看出每個(gè)方法的作用,這個(gè)不用多說(shuō)。我想說(shuō)的是:
add(e) remove() element() 方法不會(huì)阻塞線程。當(dāng)不滿足約束條件時(shí),會(huì)拋出IllegalStateException 異常。例如:當(dāng)隊(duì)列被元素填滿后,再調(diào)用add(e),則會(huì)拋出異常。
offer(e) poll() peek() 方法即不會(huì)阻塞線程,也不會(huì)拋出異常。例如:當(dāng)隊(duì)列被元素填滿后,再調(diào)用offer(e),則不會(huì)插入元素,函數(shù)返回false。
要想要實(shí)現(xiàn)阻塞功能,需要調(diào)用put(e) take() 方法。當(dāng)不滿足約束條件時(shí),會(huì)阻塞線程。
BlockingQueue 阻塞算法
BlockingQueue作為線程容器,可以為線程同步提供有力的保障。
BlockingQueue定義的常用方法如下:
| 拋出異常 | 特殊值 | 阻塞 | 超時(shí) | |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 移除 | remove() | poll() | take() | poll(time, unit) |
| 檢查 | element() | peek() | 不可用 | 不可用 |
1. ArrayBlockingQueue
基于數(shù)組的阻塞隊(duì)列實(shí)現(xiàn),在ArrayBlockingQueue內(nèi)部,維護(hù)了一個(gè)定長(zhǎng)數(shù)組,以便緩存隊(duì)列中的數(shù)據(jù)對(duì)象,這是一個(gè)常用的阻塞隊(duì)列,除了一個(gè)定長(zhǎng)數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個(gè)整形變量,分別標(biāo)識(shí)著隊(duì)列的頭部和尾部在數(shù)組中的位置。
ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲取數(shù)據(jù),都是共用同一個(gè)鎖對(duì)象,由此也意味著兩者無(wú)法真正并行運(yùn)行,這點(diǎn)尤其不同于LinkedBlockingQueue;按照實(shí)現(xiàn)原理來(lái)分析,ArrayBlockingQueue完全可以采用分離鎖,從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行。Doug Lea之所以沒(méi)這樣去做,也許是因?yàn)锳rrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧,以至于引入獨(dú)立的鎖機(jī)制,除了給代碼帶來(lái)額外的復(fù)雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個(gè)明顯的不同之處在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的Node對(duì)象。這在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對(duì)于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時(shí),我們還可以控制對(duì)象的內(nèi)部鎖是否采用公平鎖,默認(rèn)采用非公平鎖。
2. LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列,同ArrayListBlockingQueue類似,其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由一個(gè)鏈表構(gòu)成),當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí)(LinkedBlockingQueue可以通過(guò)構(gòu)造函數(shù)指定該值),才會(huì)阻塞生產(chǎn)者隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來(lái)控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
作為開(kāi)發(fā)者,我們需要注意的是,如果構(gòu)造一個(gè)LinkedBlockingQueue對(duì)象,而沒(méi)有指定其容量大小,LinkedBlockingQueue會(huì)默認(rèn)一個(gè)類似無(wú)限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費(fèi)者的速度,也許還沒(méi)有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。
阻塞隊(duì)列:線程安全
按 FIFO(先進(jìn)先出)排序元素。隊(duì)列的頭部 是在隊(duì)列中時(shí)間最長(zhǎng)的元素。隊(duì)列的尾部 是在隊(duì)列中時(shí)間最短的元素。新元素插入到隊(duì)列的尾部,并且隊(duì)列檢索操作會(huì)獲得位于隊(duì)列頭部的元素。鏈接隊(duì)列的吞吐量通常要高于基于數(shù)組的隊(duì)列,但是在大多數(shù)并發(fā)應(yīng)用程序中,其可預(yù)知的性能要低。
注意:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingDeque {
//阻塞隊(duì)列,F(xiàn)IFO
private static LinkedBlockingQueue<Integer> concurrentLinkedQueue = new LinkedBlockingQueue<Integer>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Producer("producer1"));
executorService.submit(new Producer("producer2"));
executorService.submit(new Producer("producer3"));
executorService.submit(new Consumer("consumer1"));
executorService.submit(new Consumer("consumer2"));
executorService.submit(new Consumer("consumer3"));
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
System.out.println(name+ " 生產(chǎn): " + i);
//concurrentLinkedQueue.add(i);
try {
concurrentLinkedQueue.put(i);
Thread.sleep(200); //模擬慢速的生產(chǎn),產(chǎn)生阻塞的效果
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
private String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
try {
//必須要使用take()方法在獲取的時(shí)候阻塞
System.out.println(name+"消費(fèi): " + concurrentLinkedQueue.take());
//使用poll()方法 將產(chǎn)生非阻塞效果
//System.out.println(name+"消費(fèi): " + concurrentLinkedQueue.poll());
//還有一個(gè)超時(shí)的用法,隊(duì)列空時(shí),指定阻塞時(shí)間后返回,不會(huì)一直阻塞
//但有一個(gè)疑問(wèn),既然可以不阻塞,為啥還叫阻塞隊(duì)列?
//System.out.println(name+" Consumer " + concurrentLinkedQueue.poll(300, TimeUnit.MILLISECONDS));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
ConcurrentLinkedQueue,非阻塞算法
非阻塞隊(duì)列
基于鏈接節(jié)點(diǎn)的、無(wú)界的、線程安全。此隊(duì)列按照 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序。隊(duì)列的頭部 是隊(duì)列中時(shí)間最長(zhǎng)的元素。隊(duì)列的尾部 是隊(duì)列中時(shí)間最短的元素。新的元素插入到隊(duì)列的尾部,隊(duì)列檢索操作從隊(duì)列頭部獲得元素。當(dāng)許多線程共享訪問(wèn)一個(gè)公共 collection 時(shí),ConcurrentLinkedQueue 是一個(gè)恰當(dāng)?shù)倪x擇。此隊(duì)列不允許 null 元素。
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class NoBlockQueue {
private static ConcurrentLinkedQueue<Integer> concurrentLinkedQueue = new ConcurrentLinkedQueue<Integer>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new Producer("producer1"));
executorService.submit(new Producer("producer2"));
executorService.submit(new Producer("producer3"));
executorService.submit(new Consumer("consumer1"));
executorService.submit(new Consumer("consumer2"));
executorService.submit(new Consumer("consumer3"));
}
static class Producer implements Runnable {
private String name;
public Producer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
System.out.println(name+ " start producer " + i);
concurrentLinkedQueue.add(i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//System.out.println(name+"end producer " + i);
}
}
}
static class Consumer implements Runnable {
private String name;
public Consumer(String name) {
this.name = name;
}
public void run() {
for (int i = 1; i < 10; ++i) {
try {
System.out.println(name+" Consumer " + concurrentLinkedQueue.poll());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// System.out.println();
// System.out.println(name+" end Consumer " + i);
}
}
}
}
在并發(fā)編程中,一般推薦使用阻塞隊(duì)列,這樣實(shí)現(xiàn)可以盡量地避免程序出現(xiàn)意外的錯(cuò)誤。阻塞隊(duì)列使用最經(jīng)典的場(chǎng)景就是socket客戶端數(shù)據(jù)的讀取和解析,讀取數(shù)據(jù)的線程不斷將數(shù)據(jù)放入隊(duì)列,然后解析線程不斷從隊(duì)列取數(shù)據(jù)解析。還有其他類似的場(chǎng)景,只要符合生產(chǎn)者-消費(fèi)者模型的都可以使用阻塞隊(duì)列。
使用非阻塞隊(duì)列,雖然能即時(shí)返回結(jié)果(消費(fèi)結(jié)果),但必須自行編碼解決返回為空的情況處理(以及消費(fèi)重試等問(wèn)題)。
另外他們都是線程安全的,不用考慮線程同步問(wèn)題。
相關(guān)閱讀
Java實(shí)驗(yàn)班
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
Java就業(yè)班
有基礎(chǔ) 直達(dá)就業(yè)
Java夜校直播班
業(yè)余時(shí)間 高薪轉(zhuǎn)行
Java在職加薪班
工作1~3年,加薪神器
Java架構(gòu)師班
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問(wèn)老師會(huì)電話與您溝通安排學(xué)習(xí)