Reactor詳解之:異常處理

2020-11-14 flydean程序那些事

簡介

不管是在響應式編程還是普通的程序設計中,異常處理都是一個非常重要的方面。今天將會給大家介紹Reactor中異常的處理流程。

Reactor的異常一般處理方法

先舉一個例子,我們創建一個Flux,在這個Flux中,我們產生一個異常,看看是什麼情況:

Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println);

我們會得到一個異常ErrorCallbackNotImplemented:

100 / 1 = 100100 / 2 = 50reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.ArithmeticException: / by zero

那怎麼處理這個異常呢?

有兩種方式,第一種方式就是我們之前文章講過的,在subscribe的時候指定onError方法:

Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println, error -> System.err.println("Error: " + error));

還是剛才的代碼,但是這次我們在subscribe的時候,添加了onError處理器,看下運行結果:

Divided by zero :(100 / 1 = 100100 / 2 = 50Error: java.lang.ArithmeticException: / by zero

可以看到異常已經被我們捕獲了,並且進行了合適的處理。

除了在subscribe中進行處理,我們還可以在publish的時候,就指定異常的處理模式,這就是我們要介紹的第二種方法:

Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("Divided by zero :("); flux.subscribe(System.out::println);

上面的例子中,在創建Flux的時候,手動指定了其onErrorReturn方法,我們看下輸出結果:

100 / 1 = 100100 / 2 = 50Divided by zero :(

注意,對於Flux或者Mono來說,所有的異常都是一個終止的操作,即使你使用了異常處理,原生成序列也不會繼續。

但是如果你對異常進行了處理,那麼它會將oneError信號轉換成為新的序列的開始,並將替換掉之前上遊產生的序列。

各種異常處理方式詳解

在一般的程序中,我們的異常應該怎麼處理呢?大家很容易想到的是try catch。而Reactor中subscribe的onError方法,就是try catch的一個具體應用:

Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)); flux2.subscribe(System.out::println, error -> System.err.println("Error: " + error));

還是上的例子,我們在onError方法中,對異常進行了處理。

如果轉換成為常規代碼,應該是下面的樣子:

public void normalErrorHandle(){ try{ Arrays.asList(1,2,0).stream().map(i -> "100 / " + i + " = " + (100 / i)).forEach(System.out::println); }catch (Exception e){ System.err.println("Error: " + e); } }

除了這種最基本的異常處理方法之外,Reactor還提供了很多種不同的異常處理方法,下面我們來一一介紹一下。

Static Fallback Value

Static Fallback Value的意思是,在遇到異常的時候會fallback到一個靜態的默認值。比如我們之前講到的onErrorReturn。

Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorReturn("Divided by zero :(");

當然onErrorReturn還支持一個Predicate參數,用來判斷要falback的異常是否滿足條件。

public final Flux<T> onErrorReturn(Predicate<? super Throwable> predicate, T fallbackValue)

Fallback Method

除了fallback Value之外,還支持Fallback Method。也就是說如果你想在捕獲異常之後調用其他的方法,就可以使用Fallback Method。

這裡Fallback Method是用onErrorResume來表示的。

public void useFallbackMethod(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(e -> System.out::println); flux.subscribe(System.out::println); }

Dynamic Fallback Value

所謂的動態Fallback Value就是根據你拋出的異常進行判斷,通過定位不同的Error從而fallback到不同的值:

public void useDynamicFallback(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(error -> Mono.just( MyWrapper.fromError(error))); } public static class MyWrapper{ public static String fromError(Throwable error){ return "That is a new Error"; } }

Catch and Rethrow

同樣的,我們可以在捕獲異常之後進行rethrow:

Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorResume(error -> Flux.error( new RuntimeException("oops, ArithmeticException!", error))); Flux flux2= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .onErrorMap(error -> new RuntimeException("oops, ArithmeticException!", error));

有兩種方式,第一種就是在onErrorResume中使用Flux.error構建一個新的Flux,另外一種就是直接在onErrorMap中進行處理。

Log or React on the Side

有時候你只是想記錄一下異常信息,並不想破壞原來的React結構,那麼可以試著使用doOnError。

public void useDoOnError(){ Flux flux= Flux.just(1, 2, 0) .map(i -> "100 / " + i + " = " + (100 / i)) .doOnError(error -> System.out.println("we got the error: "+ error)); }

Finally Block

如果我們在代碼中使用了某些資源,一般情況下我們需要在finally中對其進行關閉,或者使用JDK7中引入的 try-with-resource 。

舉個例子,下面的是使用finally的方式:

Stats stats = new Stats();stats.startTimer();try { doSomethingDangerous();}finally { stats.stopTimerAndRecordTiming();}

下面是使用try-with-resource的方式:

try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) { return disposableInstance.toString();}

那麼在Reactor中,我們也有兩種方式和其對應。

第一種就是doFinally方法:

Stats stats = new Stats();LongAdder statsCancel = new LongAdder();Flux<String> flux =Flux.just("foo", "bar") .doOnSubscribe(s -> stats.startTimer()) .doFinally(type -> { stats.stopTimerAndRecordTiming(); if (type == SignalType.CANCEL) statsCancel.increment(); }) .take(1);

上面的例子中,doFinally實際上做的就是finally block做的事情。

第二種是使用using,我們先看一個using的定義:

public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup)

可以看到using支持三個參數,resourceSupplier是一個生成器,用來在subscribe的時候生成要發送的resource對象。

sourceSupplier是一個生成Publisher的工廠,接收resourceSupplier傳過來的resource,然後生成Publisher對象。

resourceCleanup用來對resource進行收尾操作。

那麼我們怎麼用呢?

舉個例子:

public void useUsing(){ AtomicBoolean isDisposed = new AtomicBoolean(); Disposable disposableInstance = new Disposable() { @Override public void dispose() { isDisposed.set(true); } @Override public String toString() { return "DISPOSABLE"; } }; Flux<String> flux = Flux.using( () -> disposableInstance, disposable -> Flux.just(disposable.toString()), Disposable::dispose); }

上面的例子中,我們創建了一個Disposable對象,作為resource,然後對這個resource進行加工,返回一個Flux對象,最後通過調用Disposable::dispose方法,對resource進行銷毀。

Retrying

有時候我們遇到了異常,可能需要重試幾次,Reactor為我們提供了retry方法,先看一個例子:

public void testRetry(){ Flux.interval(Duration.ofMillis(250)) .map(input -> { if (input < 3){ return "tick " + input; } throw new RuntimeException("boom"); }) .retry(1) .elapsed() .subscribe(System.out::println, System.err::println); try { Thread.sleep(2100); } catch (InterruptedException e) { e.printStackTrace(); } }

看下輸出結果:

[264,tick 0][255,tick 1][241,tick 2][506,tick 0][252,tick 1][253,tick 2]java.lang.RuntimeException: boom

retry的作用就是當遇到異常的時候,重啟一個新的序列。

elapsed是用來展示產生的value時間之間的duration。

從結果我們可以看到,retry之前是不會產生異常信息的。

本文的例子learn-reactive

本文作者:flydean程序那些事

本文連結:http://www.flydean.com/reactor-handle-errors/

本文來源:flydean的博客

歡迎關注我的公眾號:「程序那些事」最通俗的解讀,最深刻的乾貨,最簡潔的教程,眾多你不知道的小技巧等你來發現!

相關焦點

  • 開發者資料 | Reactor詳解之:異常處理
    今天將會給大家介紹Reactor中異常的處理流程。簡介不管是在響應式編程還是普通的程序設計中,異常處理都是一個非常重要的方面。今天將會給大家介紹Reactor中異常的處理流程。: java.lang.ArithmeticException: / by zero那怎麼處理這個異常呢?
  • Linux兩種處理模式reactor模式proactor模式
    事件驅動機制Reactor的回調函數:和普通函數調用的不同之處在於事件分發器:將多路復用器中返回的就緒事件分到對應的處理函數中事件處理器:負責處理特定事件的處理函數具體流程如下:註冊讀就緒事件和相應的事件處理器事件分離器等待事件事件到來,激活分離器,分離器調用事件對應的處理器事件處理器完成實際的讀操作
  • 響應式編程簡介之:Reactor
    並且還可以和reactor-netty相結合,作為一些異步框架的底層服務,比如我們非常熟悉的Spring MVC 5中引入的WebFlux。我們知道WebFlux的底層使用的是reactor-netty,而reactor-netty又引用了Reactor。
  • Netty運用Reactor模式到極致
    常見的reactor模式有以下三種單線程reactor多線程reactor主從reactor1、單線程reactorractor 單線程模式是指所有的I/O操作都在一個NIO線程完成,該線程的職責:1.作為NIO
  • Java基礎之異常處理機制
    Java基礎之異常處理機制什麼是異常從事Java開發的小夥伴對於「異常」應該不陌生,因為每天都會遇到不少異常,或捕獲,或拋出。異常處理機制可以讓程序有更好的容錯性,使代碼更加健壯;但C語言卻沒有異常處理機制,C程式設計師一般都是利用方法的返回值來實現異常處理,使用 if + 條件 來判斷正常和異常情況,使用特定返回值來表示異常情況。
  • reactor ---- 反應堆模型
    , ev->buffer); nty_event_del(reactor->epfd, ev); nty_event_set(ev, fd, recv_cb, reactor); nty_event_add(reactor->epfd, EPOLLIN, ev); } else {
  • Java異常之異常處理機制
    異常處理機制 拉勾IT課小編為大家分解 1、拋出異常2、捕獲異常3、異常處理五個關鍵字:try、catch、finally、throw、throws注意:假設要捕獲多個異常:需要按照層級關係(異常體系結構) 從小到大!
  • Java之異常處理
    對於這類異常,可以不作處理,因為這類異常很普遍,若全處理可能會對程序的可讀性和運行效率產生影響運行時異常:執行java.exe命名時,出現的異常 是指編譯器要求必須處置的異常。即程序在運行時由於外界因素造成的一般性異常。編譯器要求Java程序必須捕獲或聲明所有編譯時異常對於這類異常,如果程序不處理,可能會帶來意想不到的結果。
  • 協程中的取消和異常 | 異常處理詳解
    : defaultCancellationException())}一旦拋出了 CancellationException 異常,您便可以使用這一機制來處理協程的取消。有關如何執行此操作的更多信息,請參考下面的處理取消的副作用一節。在底層實現中,子協程會通過拋出異常的方式將取消的情況通知到它的父級。
  • Python「守護者」之異常處理(上)
    大家好,今天我們一起聊聊Python語言的異常處理機制,為什麼說異常處理是Python的「守護者」呢?因為它時時刻刻維護著Python世界的和平與穩定,任何一個角落發生了異常、報錯,都會觸發異常處理機制,及時將異常處理,以此守護著整個Python世界。
  • smart-socket 1.4.1 發布,Proactor 與 Reactor 的組合通信
    採用proactor為主,reactor為輔的線程模型,性能得到顯著提升。 bugfix:修復AioQuickServer執行shutdown後Worker線程沒有停止的bug。(感謝richard.wu的反饋) bugfix:修復TLS/SSL關閉連接時狀態處理異常。
  • 零基礎學習 Python 之處理異常
    本文字數:2135 字閱讀本文大概需要:6 分鐘寫在之前在昨天的文章(零基礎學習 Python 之錯誤 & 異常)中我介紹了 Python 中「錯誤 & 異常」的概念,如果在程序運行過程中拋出了異常,程序就會中止運行。
  • Git實戰003:VScode使用git詳解(含異常處理)
    ​推送Gitlab異常解決在執行命令:git以上內容是小編給大家分享的【Git實戰003:VScode使用git詳解(含異常處理)】,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。
  • ...dump water to cool down Japan's overheating reactors - news
    Helicopters were sent by Japan's self-defense force on Friday to cool down the reactors of Fukushima No. 1 nuclear power station.
  • 《無盡之劍3》爛武器處理方法詳解 怎麼處理爛武器
    導 讀 菜鳥遊戲廳小編今天為大家帶來的是無盡之劍3爛武器處理方式方法詳解攻略,希望可以幫助到廣大玩家朋友。
  • python面向對象之異常處理
    異常的概念 程序在運行時,如果 `Python 解釋器` 遇到 到一個錯誤,會停止程序的執行,並且提示一些錯誤信息,這就是 異常 程序停止執行並且提示錯誤信息 這個動作,我們通常稱之為:拋出(raise)異常> 程序開發時,很難將 所有的特殊情況 都處理的面面俱到,通過 異常捕獲 可以針對突發事件做集中的處理,從而保證程序的 穩定性和健壯性
  • C語言的那些小秘密之異常處理
    第一行代碼定義了一個函數指針(註:如果有對函數指針知識點不熟悉的讀者可以去閱讀我之前寫的那篇文章《C語言的那些小秘密之函數指針》),其類型為含有一個int型參數,無返回值;  第二行代碼中,signal函數的返回值是一個函數指針,與第一行我們定義的類型相同,第二個參數也為一個函數指針,其實signal的返回值就是第二個函數指針指向的函數地址。
  • ARM Linux異常處理之data abort
    1 異常向量與程序跳轉data abort是ARM體系定義的異常之一。異常發生時,ARM會自動跳轉到異常向量表中,通過向量表中的跳轉命令跳轉到相應的異常處理中去。實際上,進入異常向量前Linux只能處於usr或者svc兩種模式之一。這時因為irq等異常在跳轉表中都要經過vector_stub宏,而不管之前是哪種狀態,這個宏都會將CPU狀態改為svc模式。usr模式即Linux中的用戶態模式,svc即內核模式。下面看一下在不同模式下進入data abort時的處理過程。
  • 眾包翻譯文檔分享 ——《Reactor 指南中文版》
    《Reactor 指南中文版》日前在開源中國眾包平臺翻譯完成,現發布在社區與各位 OSCer 共享:http://projectreactor.mydoc.io/Reactor
  • Python異常處理
    解決方案首先我們要了解異常才能處理異常那我們來就說一說異常的定義:程序運行過程中出現的錯誤或遇到的意外情況其次是錯誤的類型分別有:語法錯誤、運行錯誤、邏輯錯誤而我們通常出錯的理由無非是這些:輸入錯誤、下標越界、類型錯誤、操作不當等等接著我們來看一看報錯的構成吧接下來我們的重點來了,在python異常處理會用到