阻塞隊列實現生產者消費者以及同步工具類

2020-12-25 IT樂知

要學習多線程一些基本的同步類也是不得不學習的,這裡主要講一點基本的概念與使用。

阻塞隊列

阻塞隊列提供可阻塞的put和take方法,支持定時的offer和poll方法,如果隊列已經滿了,那麼put方法將阻塞直到有空間可用;如果隊列為空,那麼take方法將會阻塞直到有元素可用;同時隊列可以是有界也可以是無界的,無界隊列永遠都不會充滿,因此無界隊列的put方法永遠不會阻塞;

Java中的阻塞隊列主要有以實現BlockingQueue接口的幾個類,其中LinkedBlockingQueue、ArrayBlockingQueue對應LinkedList與ArrayList,他們是先進先出隊列(FIFO);阻塞隊列能夠實現生產者與消費者的關係,生產者往隊列中put數據,而消費者往take中拉取數據,進而實現異步操作。曾經的生產中的例子簡化後如下圖:

隊列還有一些實現比如:

PriorityBlockingQueue隊列是優先級隊列,根據元素的比較來控制順序;

SynchronousQueue隊列維護的不是元素而是線程,這些線程在等待把元素加入或者移除隊列,就好像餐館出菜,前面的隊列都是把菜炒好了放到放菜的地方,服務員一個一個拿走,而SynchronousQueue則沒有放菜的地方,廚師炒好菜直接給服務員端走,所以SynchronousQueue要求消費者足夠多,並且總是有至少一個消費者在等待才適合用。

同步工具類

閉鎖

閉鎖:可以延遲線程的進度直到其到達終止狀態。閉鎖相當於一個閘門,在閉鎖到達結束狀態之前,這個閘門一直是關閉的,任何線程都不能通過當到達結束狀態時,閘門允許所有線程通過,閉鎖到達結束狀態後不會再改變狀態閉鎖。

閉鎖主要作用是可以用來確保一些活動直到其他活動都完成後才繼續執行。比如一個初始化需要等另一個初始化,比如所有玩家都準備才開始。

CountDownLatch是一種閉鎖實現,它初始化一個正數,countDown方法使數字遞減,await方法使線程等待計數器到0,否則阻塞,示例如下圖:

這個方法主要用來統計一個任務在多個線程同步執行下消耗時間,利用兩個閉鎖實現了線程先一起準備好,然後放開開始閉鎖,所有線程一起執行,每個線程最後都會countDown一下結束閉鎖,當所有線程執行完成結束閉鎖也就方法,這樣就可以實現統計所有線程一起執行消耗的時間。

FutureTask

FutrueTask是一個實現了Runnable的類,並且它可以獲取到線程的執行結果,get方法執行取決於任務的狀態,如果任務已經完成,那麼get會立即返回結果,否則get將

阻塞直到任務進入完成狀態,可以利用它的這個功能來實現閉鎖

它還可以用來異步提前計算一些比較耗性能的值,比如一個操作比較耗性能那麼可以讓它先執行,等到需要計算的結果的時候在get。

信號量

計數信號量(Counting Semaphore)用來控制同時訪問某個特定資源的操作數量,或者同時執行某個指定操作的數量

實現原理是:Semaphore管理著一組虛擬的許可,執行操作前必須先acquire獲得許可,這個許可就減一,如果許可被減到0,線程再來獲取許可就阻塞等待其他線程在操作完成後release釋放許可;

比如初始化了一定數量的資料庫線程池,每個線程獲取線程池時先acquire,如果獲取成功說明有線程繼續執行,如果沒有線程了acquire方法會阻塞等待有多餘線程。

還可以把計數信號量的數量設置為1,那麼只要有一個線程acquire成功,其他線程就只能等待直到線程release,這樣就實現了一個互斥鎖。

柵欄

柵欄類似於閉鎖,他也是阻塞一組線程直到某個事件發生,區別在於柵欄是等待所有線程到達指定,而閉鎖是等待一個事件,閉鎖是一次性的不會重置。閉鎖是等待其他線程執行完成等待線程才開始執行,而柵欄是讓每個線程都等待,直到所有線程都是等待狀態,那麼所有線程一起繼續執行

可能有點不好理解,通過下面一個例子和閉鎖例子進行對比,示例如下圖:

不管執行多少次,一定是所有的「開始」列印完成後才會列印「結束」,並且如果線程的數量沒有初始化柵欄的值多,所有線程都會處於阻塞狀態。

總結

今天總結了一點阻塞隊列的知識和應用,平時一些消耗性能而創建線程去異步執行可能會造成線程太多的情況,可以簡單的通過阻塞隊列來實現生產者消費者的方式來解決問題。

還總結了一點同步工具類的知識,主要是理解這些基本概念和使用場景,閉鎖通過觸發countDown來減少計數器,當計數器減到0,await方法放行。信號量則是acquire方法去獲取release方法去釋放,柵欄則是先運行到await地方的線程先等待,等到所有線程都到了再一起執行,尤其要理解閉鎖和柵欄的區別。

閉鎖用來確保一些活動執行完成才執行一些操作,信號量用來保證某些操作的數量,柵欄用來等待所有操作一起到達指定點再一起繼續執行。

Java程式設計師日常學習筆記,如理解有誤歡迎各位交流討論!

相關焦點

  • 程式設計師:設計模式-生產者消費者分析以及實現的三種方式
    採用生產者消費者模式,A代碼將處理好的數據交給緩存區,B代碼直接從緩存區拿取數據進行處理,這樣就把A與B的依賴關係給簡介的消除了。}get.unlock();if(queue.size()==minnum){put.lock();noFull.signalAll();put.unlock();}}}}使用BlockingQueue阻塞隊列方法它在底層的實現方法中就已經進行同步操作了
  • 生產者消費者模型
    前言生產者消費者問題(Producer-consumer problem),也稱有限緩衝問題(Bounded-buffer problem),是一個多線程同步問題的經典案例。生產者生成一定量的數據放到緩衝區中,然後重複此過程;與此同時,消費者也在緩衝區消耗這些數據。
  • LabVIEW之生產者/消費者模式--隊列操作
    本文章主要是對學習LabVIEW之生產者/消費者模式的學習筆記,其中涉及到同步控制技術-隊列、事件、狀態機、生產者-消費者模式,這幾種技術在在本章中都會有側重點的進行介紹和總結
  • 怎麼實現的延時隊列?以及訂閱模式、LRU
    Redis實現消息隊列和延時隊列消息隊列Redis的實現消息隊列可以用list來實現,通過lpush與rpop或者rpush與lpop結合來實現消息隊列。其實blpop和brpop的作用是bloking pop,就是阻塞拉取數據,當消息隊列中為空時就會停止拉取,有數據後立即恢復拉取。但是當沒有數據的時候,阻塞拉取,就會一直阻塞在那裡,時間久了就成了空閒連接,那麼Redis伺服器一般會將時間閒置過久的連接直接斷掉,以減少連接資源。所以還要檢測阻塞拉取拋出的異常然後進行重試。
  • 高性能內存隊列Disruptor原理分析
    其實無鎖並不代表沒有競爭,所以當高並發寫或者讀的時候,這些工具類一樣會面臨資源爭用的極限性能問題。而lmax.com作為一家頂級外匯交易商,其交易系統需要處理的並發量非常巨大,對響應延遲也非常敏感。在這種背景下,Disruptor誕生了,它的核心思想就是:把多線程並發寫的線程安全問題轉化為線程本地寫,即:不需要做同步。同時,lmax公司基於Disruptor構建的交易系統也多次斬獲金融界大獎。
  • 結合生產消費者模式實現異步日誌功能
    一、生產者消費者模式1、實現簡單的生產消費者管理類,生產者即函數Product, 它首先加鎖,然後將數據寫入隊列,最後通過條件變量來喚醒消費者來處理數據;消費者即函數Consume, 它首先加鎖,調用條件變量的wait等待接收信號,如果接收到信號,那麼從隊列中取出數據然後處理,這裡需要注意的是取出數據之後,s可以提前解鎖,以便生產者能夠儘快處理數據,另外wait函數添加的匿名函數
  • Python異步IO實現全過程
    還有一種結構同樣可以配合異步IO使用:許多互不關聯的生產者將元素加入到一個隊列中,每一個生產者可能在不同的時間,隨機且無序的加入多個元素到隊列中。還有一組消費者不管任何信號,不停地從隊列中拉取元素。這種設計中,任何一個生產者和消費者都沒有關聯。消費者事先並不知道生產者的數量,甚至不知道將累計添加的隊列中的元素數。
  • 並發工具類Condition介紹與源碼解析
    在之前介紹AQS源碼的時候,還遺留了一個內部類ConditionObject沒有介紹,它也是並發中至關重要的類。主要作用Condition主要提供多個await方法以及signal、signalAll方法,對標的是Object的wait、notify、notifyAll方法,對應的作用也一樣。
  • java並發編程之深入學習Concurrent包(十三,雙端阻塞隊列)
    上一章學習了阻塞隊列,這次一起學習下雙端阻塞隊列。LinkedBlockingDeque簡介:LinkedBlockingDeque是雙向鍊表實現的雙端阻塞隊列。該阻塞隊列可以從隊列的頭和尾進行插入和刪除,且該阻塞隊列是線程安全的。LinkedBlockingDeque分別繼承了隊列和雙端隊列的接口,如下所示:
  • Redis實現分布式阻塞隊列
    Redis分布式鎖實現原理分布式鎖本質上要實現的目標就是在 Redis 裡面佔一個「茅坑」,當別的進程也要來佔時,發現已經有人蹲在那裡了,就只好放棄或者稍後再試。佔坑一般是使用 setnx(set if not exists) 指令,只允許被一個客戶端佔坑。先來先佔, 用完了,再調用 del 指令釋放茅坑。
  • 如何使用Spring Boot與RabbitMQ結合實現延遲隊列
    比如消費者從隊列裡消費消息時失敗了,但是想要延遲一段時間後自動重試。如果不使用延遲隊列,那麼我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便於開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。如何實現?別急,在下文中,我們將詳細介紹如何利用 Spring Boot 加 RabbitMQ 來實現延遲隊列。
  • Java中常用的七個阻塞隊列第二篇DelayQueue源碼介紹
    Java中常用的七個阻塞隊列第二篇DelayQueue源碼介紹通過前面兩篇文章,我們對隊列有了了解及已經認識了常用阻塞隊列中的三個了。本篇我們繼續介紹剩下的幾個隊列。本文主要內容:通過源碼學習Delayqueue及理解Dqueue並用代碼簡單演示使用場景。
  • Java並發包源碼學習系列:CLH同步隊列及同步資源獲取與釋放
    parkAndCheckInterrupt()void cancelAcquire(node)釋放資源 boolean release(int arg)void unparkSuccessor(Node node)參考閱讀本篇學習目標回顧CLH同步隊列的結構。
  • Python使用Queue對象實現多線程同步小案例
    queue模塊的Queue對象實現了多生產者/多消費者隊列,尤其適合需要在多個線程之間進行信息交換的場合,實現了多線程編程所需要的所有鎖語義。
  • Kafka設計思想之生產者和消費者
    生產者Load balancing生產者直接發送數據到主分區的伺服器上,不需要經過任何中間路由。 為了讓生產者實現這個功能,所有的 kafka 伺服器節點都能響應這樣的元數據請求: 哪些伺服器是活著的,主題的哪些分區是主分區,分配在哪個伺服器上,這樣生產者就能適當地直接發送它的請求到伺服器上。
  • 面試官:你手寫過堵塞隊列嗎?
    當隊列為空時,消費者掛起,隊列已滿時,生產者掛起,這就是生產-消費者模型,堵塞其實就是將線程掛起。因為生產者的生產速度和消費者的消費速度之間的不匹配,就可以通過堵塞隊列讓速度快的暫時堵塞,如生產者每秒生產兩個數據,而消費者每秒消費一個數據,當隊列已滿時,生產者就會堵塞(掛起),等待消費者消費後,再進行喚醒。堵塞隊列會通過掛起的方式來實現生產者和消費者之間的平衡,這是和普通隊列最大的區別。3.如何實現堵塞隊列?
  • 網際網路大廠:Java線程池實現原理及其在美團業務中的實踐
    在了解完「是什麼」和「為什麼」之後,下面我們來一起深入一下線程池的內部實現原理。二、線程池核心設計與實現在前文中,我們了解到:線程池是一種通過「池化」思想,幫助我們管理線程而獲取並發性的工具,在Java中的體現是ThreadPoolExecutor類。
  • 線程池阻塞隊列滿了該怎麼辦,線上宕機了隊列裡的請求會丟嗎
    如果在線程池中使用無界阻塞隊列會發生什麼問題?要回答這個問題首先要對線程池的工作原理非常熟悉,如果忘了的可以看下這篇文章線程池用無界阻塞隊列會有什麼影響有一個面試題:在遠程服務異常的情況下,使用無界阻塞隊列是否會導致內存異常飆升?這個問題的意思是遠程服務不可用會導致接口調用超時,新任務不斷提交到線程池,隊列變成得越來越大,此時會導致內存飆升起來,而且還可能會導致內存溢出OOM。通常線程池都是設置成有界隊列加一個拒絕策略。