hello !我是小小,今天是本周的第二篇,本篇將會著重的講解關於Java並行流的知識
前言
在之前如果需要處理集合需要先手動分成幾部分,然後為每部分創建線程,最後在合適的時候合併,這是手動處理並行集合的方法,在java8中,有了新功能,可以一下開啟並行模式。
並行流
認識開啟並行流
並行流是什麼?是把一個流內容分成多個數據塊,並用不同線程分別處理每個不同數據塊的流。例如,有下面一個例子,在List中,需要對List數據進行分別計算,其代碼如下所示:
List<Apple> appleList = new ArrayList<>(); // 假裝數據是從庫裡查出來的for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000);}在這裡,時間複雜度為O(list.size),隨著list的增加,耗時也在增加。並行流可以解決這個問題,代碼如下所示:
appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));這裡通過調parallelStream()說明當前流為並行流,然後進行並行執行。並行流內部使用了默認的ForkJoinPool線程池,默認線程數為處理器的核心數。
測試並行流
普通代碼如下所示:
public static void main(String[] args) throws InterruptedException { List<Apple> appleList = initAppleList(); Date begin = new Date(); for (Apple apple : appleList) { apple.setPrice(5.0 * apple.getWeight() / 1000); Thread.sleep(1000); } Date end = new Date(); log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);}輸出的內容為耗時4s。
並行代碼如下所示:
List<Apple> appleList = initAppleList();Date begin = new Date();appleList.parallelStream().forEach(apple -> { apple.setPrice(5.0 * apple.getWeight() / 1000); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } );Date end = new Date();log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);輸出結果為耗時1s。可以看到耗時大大提升了3s。
並行流拆分會影響流的速度
對於並行流來說需要注意以下幾點:
對於 iterate 方法來處理的前 n 個數字來說,不管並行與否,它總是慢於循環的,而對於 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了。它生成的是基本類型的值,不用拆裝箱操作,另外它可以直接將要生成的數字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分。因此並行狀態下的 rangeClosed() 是快於 for 循環外部迭代的代碼如下所示:
package lambdasinaction.chap7;import java.util.stream.*;public class ParallelStreams { public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get(); } public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get(); } public static long rangedSum(long n) { return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong(); } public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong(); }}package lambdasinaction.chap7;import java.util.concurrent.*;import java.util.function.*;public class ParallelStreamsHarness { public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(); public static void main(String[] args) { System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs"); System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs"); System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" ); System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs"); System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" ); } public static <T, R> long measurePerf(Function<T, R> f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + result); if (duration < fastest) fastest = duration; } return fastest; }}共享變量會造成數據出現問題
public static long sideEffectSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).forEach(accumulator::add); return accumulator.total;}public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total;}public static class Accumulator { private long total = 0; public void add(long value) { total += value; }}並行流的注意
儘量使用 LongStream / IntStream / DoubleStream 等原始數據流代替 Stream 來處理數字,以避免頻繁拆裝箱帶來的額外開銷要考慮流的操作流水線的總計算成本,假設 N 是要操作的任務總數,Q 是每次操作的時間。N * Q 就是操作的總時間,Q 值越大就意味著使用並行流帶來收益的可能性越大對於較少的數據量,不建議使用並行流容易拆分成塊的流數據,建議使用並行流關於作者
我是小小,一枚一線程序猿,我們下期再見~bye
END