題圖來自 reactive.io。原文 SubscribeOn and ObserveOn
http://akarnokd.blogspot.com/2016/03/subscribeon-and-observeon.html
註:這篇文章八月份就翻譯完成了,當時是為了加深自己對 subscribeOn 和 observeOn 的理解,本打算按照原文作者的發表順序發布譯文,但今天在寫拆輪子系列:拆 RxJava,裡面涉及到了這塊內容,為了便於援引,所以提前發布,還好發布在 scheduler 之後也算比較合理。
介紹在響應式編程生態中,最令人疑惑的一對操作符應該就是 subscribeOn 和 observeOn 了。究其根本原因,可能是以下幾點:
它們聽起來很像;
從下遊來看,有時它們的行為很類似;
它們在某種程度上存在重複;
而名字的疑惑似乎並非 RxJava 獨有,Reactor 項目存在類似的問題:publishOn 和 dispatchOn。顯然,不管它們叫什麼,大家都很容易對它們產生疑惑。
當我在 2010 年學習 Rx.NET 時,我並未對此產生任何疑惑,subscribeOn 影響 subscribe(),而 observeOn 影響 onXXX()。
(注意:我搜索了早期 Channel 9 的視頻,但並未發現類似於本文這樣關於這兩個操作符工作原理的演講。內容最接近的大概是這次演講了。)
我的「論點」是:通過自己實現這樣的操作符並分析它們的內部函數調用,我們也許可以消除對它們的疑惑。
SubscribeOnsubscribeOn() 的目的就是,確保調用 subscribe() 函數的副作用代碼(執行額外的代碼)在另外的(參數指定的)線程中執行。然而首先幾乎沒有官方的 RxJava 代碼會在自己的線程執行這些副作用代碼;其次你也可以在自定義的 Observable 中執行副作用代碼,無論是通過 create() 函數來實現,還是通過 SyncOnSubscribe 和 fromCallable() API 來實現。
那我們為什麼需要把副作用代碼移到其他的線程中執行呢?主要的使用場景是在當前線程進行網絡請求、資料庫訪問或者其他任何涉及阻塞的操作。讓一個 Tomcat 的工作線程阻塞住並不是什麼大問題(當然我們也可以通過響應式方式改進這種情況),但在 Swing 應用中阻塞了事件分發線程(Event Dispatch Thread,EDT),或者在安卓中阻塞了主線程,就會對用戶體驗造成不利的影響了。
(註:有趣的是,阻塞住 EDT 通常是 GUI 程序中最方便的一種 backpressure 策略,以此防止用戶在程序正在執行某項操作的過程中改變程序的狀態。)
因此,如果源頭會在被訂閱時立即執行一些操作,我們希望這些操作在其他的線程執行。通常我們可以把對 subscribe() 的調用以及後續整個過程的所有操作都提交到一個 ExecutorService 上,但這時我們會面臨取消訂閱和 Subscriber 分離(cancellation being separate from the Subscriber)的問題。隨著越來越多(越來越複雜)的操作需要異步地取消訂閱,用這樣的方式處理所有的情況將會變得很不方便。
幸運的是,我們可以把這一邏輯抽象成一個操作符:subscribeOn()。
為了簡潔起見,讓我們從最簡單的響應式類型(Rx.NET 的 IObservable)開始構造這個操作符:
我們先不考慮同步取消訂閱和 backpressure。
我們先假設我們的源頭只是簡單地休眠 10 秒:
顯然,當我們調用 sleeper.subscribe(new IObserver ... ) 時,程序將會休眠 10 秒,現在讓我們編寫一個操作符,把休眠操作轉移到另一個線程執行:
subscribeOn 將會提交一個訂閱實際 IObservable 的任務到 exec 上,並且這個任務會返回一個 IDisposable 用於取消這個任務的執行。
當然,你也可以通過靜態方法,或者是為 IObservable 創建一個 wrapper(因為 Java 不支持擴展方法(extension methods)),來實現同樣的效果。
關於 subscribeOn 最常見的兩個問題就是:如果使用了兩次 subscribeOn(直接或者通過其他操作符間接使用兩次),會發生什麼?為什麼第二次使用 subscribeOn 無法再次修改 subscribe 執行的線程?希望通過上述結構上的簡化,問題的答案能變得顯而易見。首先我們嘗試使用兩次 subscribeOn 操作符:
現在我們展開一下 subscribeOn.subscribe() 的代碼:
代碼很簡單,我們可以從頭讀到尾。當收到 o 時,我們提交一個任務到 exec2 上,這個任務執行時將會提交另一個任務到 exec 上,而這個任務執行時將會用 o 訂閱 sleeper。由於 subscribeOn2 是最後使用的,所以無論它將在什麼線程被執行,它都是最先執行的,但它一定會被 subscribeOn 重新調度(rescheduled) 到 exec 上執行。所以源頭被訂閱之後執行的代碼,將在最先使用的(代碼上最靠近源頭的) subscribeOn() 操作符指定的線程上執行,而且後續的 subscribeOn() 都無法改變這一結果。這就是為什麼基於 Rx 的 API 不能在返回 Observable 的時候提前使用 subscribeOn() 或者提供指定 scheduler 選項的原因。
不幸的是,上述 subscribeOn 的實現並沒有很好地處理取消訂閱:sleeper.subscribe() 的返回值並沒有和外部的 IDisposable 連接起來,所以取消外部的對象並不能「真正地」取消訂閱(譯者註:調用最外層返回的 IDisposable 的 dispose(),會調用 f2.cancel,能取消 f2 的執行,但這並不會調用 subscribeOn 中返回的 IDisposable 的 dispose(),就更不會調用到 sleeper.subscribe 返回的 IDisposable 的 dispose() 了)。當然我們可以利用一個組合的(composite) IDisposable,把所有的訂閱都添加進去,最後一併取消訂閱。不過在 RxJava 1.x 中,我們無需如此麻煩,像這樣實現這個操作符即可:
這就保證了 unsubscribe() 調用也能取消 schedule() 的執行,以及上遊使用的任何資源。我們使用 unsafeSubscribe() 以避免將原 subscriber 封裝為 SafeSubscriber,但我們無論如何都需要一次封裝,因為 subscribe() 和 unsafeSubscribe() 都會調用 Subscriber 的 onStart(),而它很可能已經被外部的 Observable 調用過了。所以我們需要避免多次調用用戶的 Subscriber.onStart() 方法。
上面的代碼也實現了 backpressure 支持,但我們還沒有完成。
在 RxJava 支持 backpressure 之前,上面的 subscribeOn() 實現會保證所有同步的源都會在同一個線程發射所有的數據:
大家都默認地依賴了這一特性。但是 backpressure 打破了這一特性,因為通常情況下調用 request() 函數的線程將會執行上面的這個循環(可以看一下 range() 的實現),導致可能的線程跳躍(thread-hopping)。所以為了保持這一特性,對 request() 的調用必須和原訂閱時處於同一個 Worker。
所以實際上 subscribeOn() 需要進行更多的操作:
除了轉發 onXXX() 之外,我們還重寫了 setProducer 方法,並且通過調度,保證對原 producer 的調用發生在同一個線程,這樣就能保證如果 request() 調用會導致新的事件發射,它們都會發生在同一個線程。
這裡我們可以進行一個小小的優化,我們可以在 schedule 時獲取當前的線程,如果調用 request() 的線程和這個線程一致,那我們就可以直接調用 p.request(n),省去調度的開銷:
observeOn 的目的是確保所有發出的數據/通知都在指定的線程中被接收。RxJava 默認是同步的,即 onXXX() 是在同一個線程中串行調用的:
在很多場景下,我們需要把 onNext() 的調用(以及其後的所有鏈式調用)轉移到另一個線程中。例如,可能生成 map() 的輸入是很快的,但是 map 時的計算非常耗時,有可能會阻塞 GUI 線程。又例如,我們可能有些在後臺線程中執行的任務(資料庫、網絡訪問,或者耗時的計算),需要把結果在 GUI 中進行展示,很多 GUI 框架只允許在特定線程中修改 GUI 內容。
從概念上來說,observeOn 通過調度一個任務,把源 observable 的 onXXX() 調度到指定的調度器(scheduler)上。這樣,下遊接收(執行)onXXX() 時,就是在指定的調度器上,但接收的是同樣的值:
這種實現方式要求 executor 是單線程的,否則就需要保證 FIFO 以及不會有來自同一個源的多個任務被同時執行。
取消訂閱的處理將更加複雜,因為我們必須保持所有正在執行中的任務,當它們執行結束時移除它們,以及保證每個任務都能被及時取消。
我相信 Rx.NET 實現這樣的要求需要一套複雜的機制,但幸運的是,RxJava 可以很方便地利用 Scheduler.Worker 實現,並達到所有取消訂閱需要的功能:
通過對比 subscribeOn 和 observeOn,我們可以發現 subscribeOn 調度了整個 source.subscribe(...) 的調用,而 observeOn 則是調度每個單獨的 subscriber.onXXX() 調用。
所以你可以看到如果多次使用 observeOn,內部被調度的任務,將會把 subscriber.onNext 的執行調度到另一個調度器中:
所以 observeOn 會重載調用鏈中指定的線程,因此最靠近 subscriber 的 observeOn 指定的線程,將作為最終 onXXX() 執行的線程。從上面展開的等效代碼我們可以看出,worker 被浪費了,因為多餘的調度並沒有任何意義。
上述的實現方式有一個問題,如果源 observable 是 range(0, 1M),訂閱後它會立即發射出所有的數據,所以我們立即會向底層的線程池中提交大量的任務。這會為下遊帶來壓力,同時也會消耗大量的內存。
引入 backpressure 主要就是解決這類問題的:防止內部 buffer 的膨脹,以及由異步執行導致的無限內存佔用。消費者通過 request() 函數來告訴生產者,它們只能消費多少個數據,以確保生產者只會生產這麼多數據(以及調用 onNext())。當消費者準備好之後,它就再次調用 request()。上面的 observeOn() 實現,通過 new Subscriber<T>(subscriber) 包裝,它就能夠處理 backpressure 了,它將把下遊的 request() 調用傳遞給上遊。然而它並不能阻止消費者調用 request(Long.MAX_VALUE),此時我們依然存在同樣的膨脹問題。
不幸的是,backpressure 的這一問題 RxJava 發現得太晚了,強制要求解決這一問題需要很大的改動。所以,backpressure 作為可選行為被引入,並把解決這一問題作為像 observeOn 這樣的操作符的責任,以此來保證有限 buffer 的 Subscriber 與無限 buffer 的 Observer 之間的相同表現(對使用者透明,transparency)。
我們可以通過一個隊列、記錄下遊 Subscriber 的請求、向源發送數量固定的請求以及一個隊列漏來解決這一問題:
現在,我們應該對解決方式很熟悉了。我們把上遊發射過來的數據加入到隊列中,或者保存異常,然後自增 wip,並且準備執行隊列漏循環。這一過程是必要的,因為可能下遊會發起請求,導致上遊瞬間發射大量數據。下遊發送請求的時候,我們也需要執行隊列漏,因為可能此時隊列中已經有數據了。在漏循環中,我們會發射隊列中的數據,同時也會請求上遊 Producer 補充數據,上遊的 Producer 是通過 setProducer() 獲得的。
我們可以繼承(擴展)上面的這個版本,增加額外的安全保護,錯誤延遲,通過參數控制每次請求的數量,甚至是補充數據的數量。上述 trySchedule 的實現具備一個特性:它無需調度器是單線程的。因為 getAndIncrement 保證了只會有一個線程能執行隊列漏循環,而且只有當 wip 遞減到零時,才會讓其他的線程有機會執行隊列漏循環。
總結在本文中,我嘗試通過實現一個簡單地、不考慮眾多複雜情況的版本,來消除對 subscribeOn 和 observeOn 這兩個操作符的疑惑。
我們看到,RxJava 實現中的複雜度,來自於我們需要處理 backpressure,以及對消費者保持透明,無論是否是直接消費序列的消費者。
現在我們理清了這兩者的內部實現,(譯者註:接下來這句話實在難懂,很可能錯誤百出,我將貼出原文,歡迎英語更好的朋友提供更準確的翻譯)接下來我們可以就它們提供的異步邊界,繼續討論有關操作符結合(fusion)的話題了。我將以如何結合使用 subscribeOn 和 observeOn 這兩個操作符為例,來展示宏觀和微觀上的結合能提供怎樣的幫助。
Now that the inner workings and structures have been clarified, let’s continue with the discussion about operator fusion where I can now use subscribeOn and observeOn as an example how macro- and micro-fusion can help around the asynchronous boundaries they provide.
以下是我對這個句子的理解:
let’s continue with the discussion about operator fusion (1) around the asynchronous boundaries they provide.
(1) where I can now use subscribeOn and observeOn as an example how macro- and micro-fusion can help