深入理解Java Stream流水線

2021-01-18 計算機java編程

前面我們已經學會如何使用Stream API,用起來真的很爽,但簡潔的方法下面似乎隱藏著無盡的秘密,如此強大的API是如何實現的呢?Pipeline是怎麼執行的,每次方法調用都會導致一次迭代嗎?自動並行又是怎麼做到的,線程個數是多少?本節我們學習Stream流水線的原理,這是Stream實現的關鍵所在。

首先回顧一下容器執行Lambda表達式的方式,以ArrayList.forEach()方法為例,具體代碼如下:

我們看到ArrayList.forEach()方法的主要邏輯就是一個for循環,在該for循環裡不斷調用action.accept()回調方法完成對元素的遍歷。這完全沒有什麼新奇之處,回調方法在Java GUI的監聽器中廣泛使用。Lambda表達式的作用就是相當於一個回調方法,這很好理解。

Stream API中大量使用Lambda表達式作為回調方法,但這並不是關鍵。理解Stream我們更關心的是另外兩個問題:流水線和自動並行。使用Stream或許很容易寫入如下形式的代碼:

上述代碼求出以字母A開頭的字符串的最大長度,一種直白的方式是為每一次函數調用都執一次迭代,這樣做能夠實現功能,但效率上肯定是無法接受的。類庫的實現著使用流水線(Pipeline)的方式巧妙地避免了多次迭代,其基本思想是在一次迭代中儘可能多的執行用戶指定的操作。為講解方便我們匯總了Stream的所有操作。

Stream上的所有操作分為兩類:中間操作和結束操作,中間操作只是一種標記,只有結束操作才會觸發實際計算。中間操作又可以分為無狀態的(Stateless)和有狀態的(Stateful),無狀態中間操作是指元素的處理不受前面元素的影響,而有狀態的中間操作必須等到所有元素處理之後才知道最終結果,比如排序是有狀態操作,在讀取所有元素之前並不能確定排序結果;結束操作又可以分為短路操作和非短路操作,短路操作是指不用處理全部元素就可以返回結果,比如找到第一個滿足條件的元素。之所以要進行如此精細的劃分,是因為底層對每一種情況的處理方式不同。

一種直白的實現方式

仍然考慮上述求最長字符串的程序,一種直白的流水線實現方式是為每一次函數調用都執一次迭代,並將處理中間結果放到某種數據結構中(比如數組,容器等)。具體說來,就是調用filter()方法後立即執行,選出所有以A開頭的字符串並放到一個列表list1中,之後讓list1傳遞給mapToInt()方法並立即執行,生成的結果放到list2中,最後遍歷list2找出最大的數字作為最終結果。程序的執行流程如如所示:

這樣做實現起來非常簡單直觀,但有兩個明顯的弊端:

迭代次數多。迭代次數跟函數調用的次數相等。頻繁產生中間結果。每次函數調用都產生一次中間結果,存儲開銷無法接受。這些弊端使得效率低下,根本無法接受。如果不使用Stream API我們都知道上述代碼該如何在一次迭代中完成,大致是如下形式:

採用這種方式我們不但減少了迭代次數,也避免了存儲中間結果,顯然這就是流水線,因為我們把三個操作放在了一次迭代當中。只要我們事先知道用戶意圖,總是能夠採用上述方式實現跟Stream API等價的功能,但問題是Stream類庫的設計者並不知道用戶的意圖是什麼。如何在無法假設用戶行為的前提下實現流水線,是類庫的設計者要考慮的問題。

Stream流水線解決方案

我們大致能夠想到,應該採用某種方式記錄用戶每一步的操作,當用戶調用結束操作時將之前記錄的操作疊加到一起在一次迭代中全部執行掉。沿著這個思路,有幾個問題需要解決:

用戶的操作如何記錄?操作如何疊加?疊加之後的操作如何執行?執行後的結果(如果有)在哪裡?操作如何記錄

注意這裡使用的是「操作(operation)」一詞,指的是「Stream中間操作」的操作,很多Stream操作會需要一個回調函數(Lambda表達式),因此一個完整的操作是<數據來源,操作,回調函數>構成的三元組。Stream中使用Stage的概念來描述一個完整的操作,並用某種實例化後的PipelineHelper來代表Stage,將具有先後順序的各個Stage連到一起,就構成了整個流水線。跟Stream相關類和接口的繼承關係圖示。

還有IntPipeline, LongPipeline, DoublePipeline沒在圖中畫出,這三個類專門為三種基本類型(不是包裝類型)而定製的,跟ReferencePipeline是並列關係。圖中Head用於表示第一個Stage,即調用調用諸如Collection.stream()方法產生的Stage,很顯然這個Stage裡不包含任何操作;StatelessOp和StatefulOp分別表示無狀態和有狀態的Stage,對應於無狀態和有狀態的中間操作。

Stream流水線組織結構示意圖如下:

圖中通過Collection.stream()方法得到Head也就是stage0,緊接著調用一系列的中間操作,不斷產生新的Stream。這些Stream對象以雙向鍊表的形式組織在一起,構成整個流水線,由於每個Stage都記錄了前一個Stage和本次的操作以及回調函數,依靠這種結構就能建立起對數據源的所有操作。這就是Stream記錄操作的方式。

操作如何疊加

以上只是解決了操作記錄的問題,要想讓流水線起到應有的作用我們需要一種將所有操作疊加到一起的方案。你可能會覺得這很簡單,只需要從流水線的head開始依次執行每一步的操作(包括回調函數)就行了。

這聽起來似乎是可行的,但是你忽略了前面的Stage並不知道後面Stage到底執行了哪種操作,以及回調函數是哪種形式。換句話說,只有當前Stage本身才知道該如何執行自己包含的動作。這就需要有某種協議來協調相鄰Stage之間的調用關係。

這種協議由Sink接口完成,Sink接口包含的方法如下表所示:

有了上面的協議,相鄰Stage之間調用就很方便了,每個Stage都會將自己的操作封裝到一個Sink裡,前一個Stage只需調用後一個Stage的accept()方法即可,並不需要知道其內部是如何處理的。當然對於有狀態的操作,Sink的begin()和end()方法也是必須實現的。

比如Stream.sorted()是一個有狀態的中間操作,其對應的Sink.begin()方法可能創建一個乘放結果的容器,而accept()方法負責將元素添加到該容器,最後end()負責對容器進行排序。對於短路操作,Sink.cancellationRequested()也是必須實現的,比如Stream.findFirst()是短路操作,只要找到一個元素,cancellationRequested()就應該返回true,以便調用者儘快結束查找。Sink的四個接口方法常常相互協作,共同完成計算任務。實際上Stream API內部實現的的本質,就是如何重載Sink的這四個接口方法。

有了Sink對操作的包裝,Stage之間的調用問題就解決了,執行時只需要從流水線的head開始對數據源依次調用每個Stage對應的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一種可能的Sink.accept()方法流程是這樣的:

Sink接口的其他幾個方法也是按照這種[處理->轉發]的模型實現。下面我們結合具體例子看看Stream的中間操作是如何將自身的操作包裝成Sink以及Sink是如何將處理結果轉發給下一個Sink的。先看Stream.map()方法:

上述代碼看似複雜,其實邏輯很簡單,就是將回調函數mapper包裝到一個Sink當中。由於Stream.map()是一個無狀態的中間操作,所以map()方法返回了一個StatelessOp內部類對象(一個新的Stream),調用這個新Stream的opWripSink()方法將得到一個包裝了當前回調函數的Sink。

再來看一個複雜一點的例子。Stream.sorted()方法將對Stream中的元素進行排序,顯然這是一個有狀態的中間操作,因為讀取所有元素之前是沒法得到最終順序的。拋開模板代碼直接進入問題本質,sorted()方法是如何將操作封裝成Sink的呢?sorted()一種可能封裝的Sink代碼如下:

上述代碼完美的展現了Sink的四個接口方法是如何協同工作的:

首先beging()方法告訴Sink參與排序的元素個數,方便確定中間結果容器的的大小;之後通過accept()方法將元素添加到中間結果當中,最終執行時調用者會不斷調用該方法,直到遍歷所有元素;最後end()方法告訴Sink所有元素遍歷完畢,啟動排序步驟,排序完成後將結果傳遞給下遊的Sink;如果下遊的Sink是短路操作,將結果傳遞給下遊時不斷詢問下遊cancellationRequested()是否可以結束處理。疊加之後的操作如何執行

Sink完美封裝了Stream每一步操作,並給出了[處理->轉發]的模式來疊加操作。這一連串的齒輪已經咬合,就差最後一步撥動齒輪啟動執行。是什麼啟動這一連串的操作呢?也許你已經想到了啟動的原始動力就是結束操作(Terminal Operation),一旦調用某個結束操作,就會觸發整個流水線的執行。

結束操作之後不能再有別的操作,所以結束操作不會創建新的流水線階段(Stage),直觀的說就是流水線的鍊表不會在往後延伸了。結束操作會創建一個包裝了自己操作的Sink,這也是流水線中最後一個Sink,這個Sink只需要處理數據而不需要將結果傳遞給下遊的Sink(因為沒有下遊)。對於Sink的[處理->轉發]模型,結束操作的Sink就是調用鏈的出口。

我們再來考察一下上遊的Sink是如何找到下遊Sink的。一種可選的方案是在PipelineHelper中設置一個Sink欄位,在流水線中找到下遊Stage並訪問Sink欄位即可。但Stream類庫的設計者沒有這麼做,而是設置了一個Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法來得到Sink,該方法的作用是返回一個新的包含了當前Stage代表的操作以及能夠將結果傳遞給downstream的Sink對象。

為什麼要產生一個新對象而不是返回一個Sink欄位?這是因為使用opWrapSink()可以將當前操作與下遊Sink(上文中的downstream參數)結合成新Sink。試想只要從流水線的最後一個Stage開始,不斷調用上一個Stage的opWrapSink()方法直到最開始(不包括stage0,因為stage0代表數據源,不包含操作),就可以得到一個代表了流水線上所有操作的Sink,用代碼表示就是這樣:

現在流水線上從開始到結束的所有的操作都被包裝到了一個Sink裡,執行這個Sink就相當於執行整個流水線,執行Sink的代碼如下:

上述代碼首先調用wrappedSink.begin()方法告訴Sink數據即將到來,然後調用spliterator.forEachRemaining()方法對數據進行迭代(Spliterator是容器的一種迭代器,參閱),最後調用wrappedSink.end()方法通知Sink數據處理結束。邏輯如此清晰。

執行後的結果在哪裡

最後一個問題是流水線上所有操作都執行後,用戶所需要的結果(如果有)在哪裡?首先要說明的是不是所有的Stream結束操作都需要返回結果,有些操作只是為了使用其副作用(Side-effects),比如使用Stream.forEach()方法將結果列印出來就是常見的使用副作用的場景(事實上,除了列印之外其他場景都應避免使用副作用),對於真正需要返回結果的結束操作結果存在哪裡呢?

特別說明:副作用不應該被濫用,也許你會覺得在Stream.forEach()裡進行元素收集是個不錯的選擇,就像下面代碼中那樣,但遺憾的是這樣使用的正確性和效率都無法保證,因為Stream可能會並行執行。大多數使用副作用的地方都可以使用歸約操作更安全和有效的完成。

回到流水線執行結果的問題上來,需要返回結果的流水線結果存在哪裡呢?這要分不同的情況討論,下表給出了各種有返回結果的Stream結束操作。

對於表中返回boolean或者Optional的操作(Optional是存放 一個 值的容器)的操作,由於值返回一個值,只需要在對應的Sink中記錄這個值,等到執行結束時返回就可以了。對於歸約操作,最終結果放在用戶調用時指定的容器中(容器類型通過收集器指定)。collect(), reduce(), max(), min()都是歸約操作,雖然max()和min()也是返回一個Optional,但事實上底層是通過調用reduce()方法實現的。對於返回是數組的情況,毫無疑問的結果會放在數組當中。這麼說當然是對的,但在最終返回數組之前,結果其實是存儲在一種叫做Node的數據結構中的。Node是一種多叉樹結構,元素存儲在樹的葉子當中,並且一個葉子節點可以存放多個元素。這樣做是為了並行執行方便。關於Node的具體結構,我們會在下一節探究Stream如何並行執行時給出詳細說明。結語

本文詳細介紹了Stream流水線的組織方式和執行過程,學習本文將有助於理解原理並寫出正確的Stream代碼,同時打消你對Stream API效率方面的顧慮。如你所見,Stream API實現如此巧妙,即使我們使用外部迭代手動編寫等價代碼,也未必更加高效。

相關焦點

  • 深入分析java中的多態(從jvm角度分析)
    對於java中多態概念的理解一直是面試常問的問題,所以今天花了一些時間好好地整理了一下,力求從java虛擬機的角度來分析和理解多態。一、認識多態1、方法調用在Java中,方法調用有兩類,動態方法調用與靜態方法調用。
  • 程式設計師:深入理解JVM,從JVM層面來講Java多態
    對多態理解不夠深入的,多半都會答錯;如果能記住口訣:「變量多態看左邊,方法多態看右邊,靜態多態看左邊」的話,肯定就知道答案,但是JVM是如何確定具體調用哪個方法的,有小夥伴思考過嗎?主要原因是字面量不需要定義,所以字面量沒有顯式的靜態類型,它的靜態類型只能通過語言上的規則去理解和推斷。
  • Java中Lambda表達式的5種不同語法
    1.標準語法考慮以下示例:String[] arr = {"program", "creek", "is", "a", "java", "site"};Arrays.sort在這種情況下,它是一個單一表達式-Integer.compare(m.length(), n.length())Output:[a, is, java, site, creek, program]2.可以推斷參數類型
  • 跟我學java編程—深入理解for語句的嵌套循環
    示例1:用「*」輸出一個菱形圖案,圖案如下: 在D盤Java目錄下,新建「ForSample1.java」文件。用記事本打開「ForSample1.java」文件,輸入以下代碼:代碼結構分析程序功能主要是演示for嵌套循環的使用方法。
  • Java反射機制深入詳解
    ()字節碼已經加載到java虛擬機中,去得到字節碼;java虛擬機中還沒有生成字節碼 用類加載器進行加載,加載的字節碼緩衝到虛擬機中。= 0; i < m.length; i++) System.out.println(m[i].toString()); } catch (Throwable e){ System.err.println(e); } } }1 public synchronized java.lang.Object java.util.Stack.pop() 2 public java.lang.Object
  • Java8 lambda表達式語法
    文中如有錯誤和理解偏差的地方,希望大家幫忙指出,我會持續修改和優化。本文是該系列的第一篇,主要介紹Java8對屌絲碼農最有吸引力的一個特性—lambda表達式。java8的安裝工欲善其器必先利其器,首先安裝JDK8。過程省略,大家應該都可以自己搞定。
  • PDFBox 2.0.6 發布,Java 的 PDF 處理類庫
    COSParser.parseXref failing if startXrefOffset over pdf size[PDFBOX-3687] - PDFBox doesn't respect different setting of /DA at PDAnnotationWidget level to /DA at PDField level[PDFBOX-3717] - java.io.IOException
  • Java基礎教程:java反射機制教程
    一、反射概念 在正式講解反射之前,為了很好的去理解它我們先從一個案例說起。這時候java語言在設計的時候為我們提供了一個機制,就是反射機制,他能夠很方便的去解決我們的問題。
  • Java程式設計師必備基礎:Java代碼是怎麼運行的?
    最近複習了深入理解Java虛擬機,做了一下總結,希望對大家有幫助,如果有不正確的地方,歡迎提出,感激不盡。java源文件編譯為class字節碼 類加載器把字節碼加載到虛擬機的方法區。
  • Android被指抄襲Java代碼引爭議
    (轉自谷奧)首先是第一組的7個抄襲的java文件(PolicyNodeImpl.java, AclEntryImpl.java, AclImpl.java, GroupImpl.java, OwnerImpl.java, PermissionImpl.java 和 PrincipalImpl.java)都屬於原始碼裡的測試分支。任何程式設計師都不會將測試代碼放到最終發布的產品裡。
  • Java 中的繼承和多態(深入版)
    必須有重寫重寫,簡單地理解就是重新定義的父類方法,使得父類和子類對同一行為的表現形式各不相同。我們用白切雞類來舉個慄子。所以可以簡單的理解,重寫就是子類對父類方法的核心進行重新定義。也可以簡單理解成,同一行為(方法)的不同表現形式。
  • Java NIO 基礎知識
    下面的描述,我儘量保證準確,但是不會展開得太具體,因為虛擬內存還是蠻複雜的,要完全介紹清楚,恐怕需要很大的篇幅,如果讀者對這方面的內容感興趣的話,建議讀者尋找更加專業全面的介紹資料,如《深入理解計算機系統》。物理內存被組織成一個很大的數組,每個單元是一個字節大小,然後每個字節都有一個唯一的物理地址,這應該很好理解。
  • 開發崗位這麼多,為什麼選Java?你學Java了嗎-開課吧
    提到C++語言,很多人發現在使用過程中最容易出現的錯誤就是內存管理,而java有自動垃圾回收器,不用擔心內存。零基礎學Java作為一門面向對象的高級語言,Java不僅吸收了C++語言的各種優點,還對C++裡諸如多繼承、指針等一些讓用戶難以理解和掌握的概念重新組織和及摒棄
  • New particle discovered in the bloodstream of patients with....
    New particle discovered in the bloodstream of patients with sepsis Researchers have identified new particles in the bloodstream, which appear to break off immune cells called
  • 用最傻瓜式的方法理解Java中的封裝、繼承和多態
    說到java中面向對象的封裝、繼承和多態,很多人會說這麼簡單這麼入門的東西一直講幹嘛,雖然是基礎,但是我們要明白,所有東西都是要在基礎上發展的,沒有基礎就去學其他的,那麼你肯定是很容易失敗的,那麼應該怎樣能夠用傻瓜式方法去理解Java面向對象的封裝、繼承和多態呢?
  • 學習java的優勢是什麼?學習難度怎麼樣
    學習java成為了現在比較熱門的話題。Java是一種可以撰寫跨平臺應用軟體的面向對象的程序設計語言。  他容易學而且很好用,如果你學習過C++語言,你會覺得C++和Java很像,因為Java中許多基本語句的語法和C++一樣,像常用的循環語句,控制語句等和C++幾乎一樣,其實Java和C++是兩種完全不同的語言,Java只需理解一些基本的概念,就可以用它編寫出適合於各種情況的應用程式。同時截止到2018年8月,java領先其它語言幾乎4.5%。為世界第一程式語言。
  • Java反射初探 ——「當類也學會照鏡子」
    動態加載類 我理解的「反射」的意義(僅個人理解哈) 我理解的java反射機制就是: 提供一套完善而強大的API「反射「類的結構。 所以我們可以其實可以將java中的對象分為兩種: 1.:import java.lang.reflect.Constructor;import java.lang.reflect.Field;import java.lang.reflect.Method; public class Test {  public static void printClassMessage (Object obj)
  • 跟我學java編程—認識java語言的字符類型
    用記事本打開「CharSample.java」文件,輸入以下代碼:編譯「CharSample.java」文件,在命令行窗口輸入「javac CharSample.java」並執行命令,編譯通過後,在命令行窗口輸入「java CharSample」運行Java程序,命令行窗口顯示如下信息:
  • Java之Random類的簡單介紹
    第一步,導包:import java.util.Random第二步,創建:Random a=new Random();小括號是可以留空的第三步,使用:如果要獲取一個隨機數int數字(範圍是int所有範圍,有正負兩種):int num=a.nextInt();為了方便大家的理解,小編就先粘幾行代碼,是一個比較簡單的猜數字小遊戲,代碼如下: