【Java 8 in Action】Stream
文章目錄
- Stream
- Stream的介紹
- Stream的創建
- Stream的中間操作
- Stream 的終止操作
- 并行流與串行流
Stream
Stream的介紹
Lambda表達式是Stream的基礎,如果你還不懂它,可以參考這篇文章
在Lambda表達式章節也說了Stream 是 Java 8 的一大亮點。它與 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。我們看一下官方文檔
官方文檔表明:
簡單的說流 (Stream) 是數據渠道,用于操作數據源(集合、數組等)所生成的元素序列。“集合講的是數據,流講的是計算!
值得注意的地方:
- Stream 自己不會存儲元素。
- Stream 不會改變源對象。相反,他們會返回一個持有結果的新Stream。
- Stream 操作是延遲執行的。這意味著他們會等到需要結果的時候才執行。
Stream的操作有三個步驟:
一個數據源(如:集合、數組),獲取一個流
一個中間操作鏈,對數據源的數據進行處理
一個終止操作,執行中間操作鏈,并產生結果
類似于這種:
Stream的創建
可以通過多種方式創建流:
1、通過集合的stream()方法或者parallelStream(),比如Arrays.asList(1,2,3).stream()。
2、通過Arrays.stream(Object[])方法, 比如Arrays.stream(new int[]{1,2,3})。
3、使用流的靜態方法,比如Stream.of(Object[]), IntStream.range(int, int) 或者 Stream.iterate(Object, UnaryOperator),如Stream.iterate(0, n -> n * 2),或者generate(Supplier<T> s)如Stream.generate(Math::random)。
4、BufferedReader.lines()從文件中獲得行的流。
5、Files類的操作路徑的方法,如list、find、walk等。
6、隨機數流Random.ints()。
7、其它一些類提供了創建流的方法,如BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), 和 JarFile.stream()。
8、更底層的使用StreamSupport,它提供了將Spliterator轉換成流的方法。
我們這里僅展示常見的
Java8 中的 Collection 接口被擴展,提供了兩個獲取流的方法 :
default Stream<E> stream() : 返回一個順序流default Stream<E> parallelStream() : 返回一個并行流下面我們來看看如何創建流
由數組創建流
Java8 中的 Arrays 的靜態方法 stream() 可以獲取數組流:
由值創建流
可以使用靜態方法 Stream.of(), 通過顯示值創建一個流。它可以接收任意數量的參數。
由函數創建流:創建無限流
可以使用靜態方法 Stream.iterate() 和Stream.generate(), 創建無限流。
演示:
//1. Collection 提供了兩個方法 stream() 與 parallelStream()List<String> list = new ArrayList<>();Stream<String> stream = list.stream(); //獲取一個順序流Stream<String> parallelStream = list.parallelStream(); //獲取一個并行流//2. 通過 Arrays 中的 stream() 獲取一個數組流Integer[] nums = new Integer[10];Stream<Integer> stream1 = Arrays.stream(nums);//3. 通過 Stream 類中靜態方法 of()Stream<Integer> stream2 = Stream.of(1,2,3,4,5,6);//4. 創建無限流//迭代Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);stream3.forEach(System.out::println);//生成Stream<Double> stream4 = Stream.generate(Math::random).limit(2);stream4.forEach(System.out::println);Stream的中間操作
多個中間操作可以連接起來形成一個 流水線,除非流水線上觸發終止操作,否則 中間操作不會執行任何的 處理!而在 終止操作時一次性全部處理,稱為“惰性求值”
篩選與切片
| filter(Predicate p) | 接收 Lambda , 從流中排除某些元素。 |
| distinct() | 篩選,通過流所生成元素的 hashCode() 和 equals() 去除重復元素 |
| limit(long maxSize) | 截斷流,使其元素不超過給定數量。 |
| skip(long n) | 跳過元素,返回一個扔掉了前 n 個元素的流。若流中元素不足 n 個,則返回一個空流。與 limit(n) 互補 |
映射
| map(Function f) | 接收一個函數作為參數,該函數會被應用到每個元素上,并將其映射成一個新的元素。 |
| mapToDouble(ToDoubleFunction f) | 接收一個函數作為參數,該函數會被應用到每個元素上,產生一個新的 DoubleStream。 |
| mapToInt(ToIntFunction f) | 接收一個函數作為參數,該函數會被應用到每個元素上,產生一個新的 IntStream。 |
| mapToLong(ToLongFunction f) | 接收一個函數作為參數,該函數會被應用到每個元素上,產生一個新的 LongStream。 |
| flatMap(Function f) | 接收一個函數作為參數,將流中的每個值都換成另一個流,然后把所有流連接成一個流 |
排序
| sorted() | 產生一個新流,其中按自然順序排序 |
| sorted(Comparator comp) | 產生一個新流,其中按比較器順序排序 |
演示:
新建一個實體類Employee :
核心代碼
import org.junit.Test;import java.io.PrintStream; import java.util.*; import java.util.function.*; import java.util.stream.Stream;/** 一、Stream API 的操作步驟:** 1. 創建 Stream** 2. 中間操作** 3. 終止操作(終端操作)*/ public class MyTest {//1. 創建 Stream@Testpublic void test1(){//1. Collection 提供了兩個方法 stream() 與 parallelStream()List<String> list = new ArrayList<>();Stream<String> stream = list.stream(); //獲取一個順序流Stream<String> parallelStream = list.parallelStream(); //獲取一個并行流//2. 通過 Arrays 中的 stream() 獲取一個數組流Integer[] nums = new Integer[10];Stream<Integer> stream1 = Arrays.stream(nums);//3. 通過 Stream 類中靜態方法 of()Stream<Integer> stream2 = Stream.of(1,2,3,4,5,6);//4. 創建無限流//迭代Stream<Integer> stream3 = Stream.iterate(0, (x) -> x + 2).limit(10);stream3.forEach(System.out::println);//生成Stream<Double> stream4 = Stream.generate(Math::random).limit(2);stream4.forEach(System.out::println);}//2. 中間操作List<Employee> emps = Arrays.asList(new Employee(102, "李四", 59, 6666.66),new Employee(101, "張三", 18, 9999.99),new Employee(103, "王五", 28, 3333.33),new Employee(104, "趙六", 8, 7777.77),new Employee(104, "趙六", 8, 7777.77),new Employee(104, "趙六", 8, 7777.77),new Employee(105, "田七", 38, 5555.55));/*篩選與切片filter——接收 Lambda , 從流中排除某些元素。limit——截斷流,使其元素不超過給定數量。skip(n) —— 跳過元素,返回一個扔掉了前 n 個元素的流。若流中元素不足 n 個,則返回一個空流。與 limit(n) 互補distinct——篩選,通過流所生成元素的 hashCode() 和 equals() 去除重復元素 要記得使用前重寫元素的hashCode() 和 equals() 方法*///內部迭代:迭代操作 Stream API 內部完成@Testpublic void test2(){//所有的中間操作不會做任何的處理Stream<Employee> stream = emps.stream().filter((e) -> {System.out.println("測試中間操作");return e.getAge() <= 35;});//只有當做終止操作時,所有的中間操作會一次性的全部執行,稱為“惰性求值”stream.forEach(System.out::println);}//外部迭代@Testpublic void test3(){Iterator<Employee> it = emps.iterator();while(it.hasNext()){System.out.println(it.next());}}@Testpublic void test4(){emps.stream().filter((e) -> {System.out.println("短路!"); // && ||return e.getSalary() >= 5000;}).limit(3).forEach(System.out::println);}@Testpublic void test5(){emps.parallelStream().filter((e) -> e.getSalary() >= 5000).skip(2).forEach(System.out::println);}@Testpublic void test6(){emps.stream().distinct().forEach(System.out::println);} }
代碼太長了,分成了幾段
Stream 的終止操作
終端操作會從流的流水線生成結果。其結果可以是任何不是流的值,例如:List、Integer,甚至是 void 。
查找與匹配
| allMatch(Predicate p) | 檢查是否匹配所有元素 |
| anyMatch( (Predicate p) ) | 檢查是否至少匹配一個元素 |
| noneMatch(Predicate p) | 檢查是否沒有匹配所有元素 |
| findFirst() | 返回第一個元素 |
| findAny() | 返回當前流中的任意元素 |
| count() | 返回流中元素總數 |
| max(Comparator c ) | 返回流中最大值 |
| min(Comparator c ) | 返回流中最小值 |
| forEach(Consumer c ) | 內部迭代( (用 使用 Collection 接口需要用戶去做迭代,稱為 外部迭代 。相反, Stream API 使用內部迭代 —— 它幫你把迭代做了) ) |
演示一下:
更改Employee類,添加枚舉類型
public enum Status{SLEEP,WORK,TRAVEL}代碼:
import org.junit.Test;import java.util.Arrays; import java.util.List; import java.util.Optional;/** 一、Stream API 的操作步驟:** 1. 創建 Stream** 2. 中間操作** 3. 終止操作(終端操作)*/ public class MyTest {//3. 終止操作(終端操作)List<Employee> emps = Arrays.asList(new Employee(102, "李四", 59, 6666.66, Employee.Status.SLEEP),new Employee(101, "張三", 18, 9999.99, Employee.Status.WORK),new Employee(103, "王五", 28, 3333.33, Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.SLEEP),new Employee(104, "趙六", 8, 7777.77, Employee.Status.SLEEP),new Employee(105, "田七", 38, 5555.55, Employee.Status.WORK));/*查找與匹配a11Match 檢查是否匹配所有元素anyMatch 檢查是否至少匹配一個元素noneMatch 檢查是否沒有匹配所有元素findfirst 返回第一個元素findAny 返回當前流中的任意元素count 返回流中元素的總個數max—返回流中最大值min—返回流中最小值*/@Testpublic void test1() {boolean b1 = emps.stream().allMatch((e) -> e.getStatus().equals(Employee.Status.WORK));System.out.println("allMatch: "+b1);boolean b2 = emps.stream().anyMatch((e) -> e.getStatus().equals(Employee.Status.WORK));System.out.println("anyMatch: "+b2);boolean b3 = emps.stream().noneMatch((e) -> e.getStatus().equals(Employee.Status.WORK));System.out.println("noneMatch: "+b3);//值可能為空 結果封裝到Optional容器中Optional<Employee> op = emps.stream().sorted((e1, e2) -> Double.compare(e1.getSalary() ,e2.getSalary())).findFirst();System.out.println(op.get());System.out.println("**************************************");Optional<Employee> op2 = emps.stream().filter((e1) -> e1.getStatus().equals(Employee.Status.SLEEP)).findAny();System.out.println(op2.get());System.out.println("**************************************");Long count = emps.stream().count();System.out.println(count);System.out.println("**************************************");Optional<Employee> op3 = emps.stream().max((e1, e2) -> Double.compare(e1.getSalary(),e2.getSalary()));System.out.println(op3.get());emps.stream().map(Employee :: getSalary).min(Double::compare);} }歸約
| reduce(T iden, BinaryOperator b) | 可以將流中元素反復結合起來,得到一個值。返回 T |
| reduce(BinaryOperator b) | 可以將流中元素反復結合起來,得到一個值。返回 Optional<T> |
map 和 reduce 的連接通常稱為 map-reduce 模式,因 Google 用它來進行網絡搜索而出名。
收集
| collect(Collector c) | 將流轉換為其他形式。接收一個 Collector接口實現,用于給Stream中元素做匯總的方法 |
Collector 接口中方法的實現決定了如何對流執行收集操作(如收集到 List、Set、Map)。但是 Collectors 實用類提供了很多靜態
方法,可以方便地創建常見收集器實例,具體方法與實例如下:
| toList | List、 | 把流中元素收集到List |
| toSet | Set<T> | 把流中元素收集到Set |
| toCollection | Collection<T> | 把流中元素收集到創建的集合 |
| counting | Long | 計算流中元素的個數 |
| summingInt | Integer | 對流中元素的整數屬性求和 |
| averagingInt | Double | 計算流中元素Integer屬性的平均值 |
| summarizingInt | IntSummaryStatistics | 收集流中Integer屬性的統計值。如:平均值 |
| joining | String | 連接流中每個字符串 |
| maxBy | Optional<T> | 根據比較器選擇最大值 |
| minBy | Optional<T> | 根據比較器選擇最小值 |
| reducing | 歸約產生的類型 | 從一個作為累加器的初始值開始,利用BinaryOperator與流中元素逐個結合,從而歸約成單個值 |
| collectingAndThen | 轉換函數返回的類型 | 包裹另一個收集器,對其結果轉換函數 |
| groupingBy | Map<K, List<T>> | 根據某屬性值對流分組,屬性為K,結果為V |
| partitioningBy | Map<Boolean, List<T>> | 根據true或false進行分區 |
演示
package com.study;import org.junit.Test; import java.util.*; import java.util.stream.Collectors;/** 一、Stream API 的操作步驟:** 1. 創建 Stream** 2. 中間操作** 3. 終止操作(終端操作)*/ public class MyTest {//3. 終止操作(終端操作)List<Employee> emps = Arrays.asList(new Employee(102, "李四", 59, 6666.66, Employee.Status.SLEEP),new Employee(101, "張三", 18, 9999.99, Employee.Status.WORK),new Employee(103, "王五", 28, 3333.33, Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.TRAVEL),new Employee(104, "趙六", 8, 7777.77,Employee.Status.SLEEP),new Employee(104, "趙六", 8, 7777.77, Employee.Status.SLEEP),new Employee(105, "田七", 38, 5555.55, Employee.Status.WORK));/*歸約reduce( T identity, Binaryoperator)/ reduce( Binaryoperator)- 可以將流中元泰反復結合起來,得到一個值。identity為起始值Binaryoperator為二元運算 繼承BinFunction接口 屬于函數式接口*/@Testpublic void test1() {List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9);Integer sum = list.stream().reduce(0, (x, y) -> x+y );System.out.println(sum);System.out.println("***********************************************");Optional<Double> op = emps.stream().map(Employee::getSalary).reduce(Double::sum);System.out.println(op.get());System.out.println("***********************************************");}/*收集collect—將流轉換為其他形式。接收一個Co1lector接口的實現,用于給 Stream中元素做匯總的方法*/@Testpublic void test2() {List<String> list = emps.stream().map(Employee::getName).collect(Collectors.toList());list.forEach(System.out::println);System.out.println("***********************************************");Set<String> set = emps.stream().map(Employee::getName).collect(Collectors.toSet());set.forEach(System.out::println);System.out.println("***********************************************");// 放在特殊的集合中HashSet<String> hs = emps.stream().map(Employee::getName).collect(Collectors.toCollection(HashSet::new));hs.forEach(System.out::println);System.out.println("***********************************************");// 總數Long count = emps.stream().collect(Collectors.counting());System.out.println(count);System.out.println("***********************************************");// 平均值Double avg = emps.stream().collect(Collectors.averagingDouble(Employee::getSalary));System.out.println(avg);System.out.println("***********************************************");// 總和Double sum = emps.stream().collect(Collectors.summingDouble(Employee::getSalary));System.out.println(sum);System.out.println("***********************************************");// 最大值Optional<Employee> max = emps.stream().collect(Collectors.maxBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));System.out.println(max.get());System.out.println("***********************************************");// 最小值Optional<Employee> min = emps.stream().collect(Collectors.minBy((e1, e2) -> Double.compare(e1.getSalary(), e2.getSalary())));System.out.println(min.get());}@Testpublic void test3() {// 分組Map<Employee.Status,List<Employee>> map = emps.stream().collect(Collectors.groupingBy(Employee::getStatus));System.out.println(map);System.out.println("***********************************************");// 多級分組Map<Employee.Status, Map<String,List<Employee>>> maps = emps.stream()//groupingBy的條件可以一直加.collect(Collectors.groupingBy(Employee::getStatus, Collectors.groupingBy((e) -> {if(e.getAge() <= 40) {return "青年";} else {return "其他";}})));System.out.println(maps);}@Testpublic void test4() {// 分區Map<Boolean, List<Employee>> map = emps.stream().collect(Collectors.partitioningBy((e) -> e.getSalary() > 80));System.out.println(map);}@Testpublic void test5(){DoubleSummaryStatistics dss = emps.stream().collect(Collectors.summarizingDouble(Employee::getSalary));System.out.println(dss.getSum());System.out.println(dss.getAverage());System.out.println(dss.getMax());}@Testpublic void test6() {// 所有的名字都連成了字符串String str = emps.stream().map(Employee::getName).collect(Collectors.joining());// joining() 里面填寫連接符 比如逗號分割 joining(",")// 如果想要添加兩側的 可以這樣joining(",","【","】")System.out.println(str);}}test1的結果:
test2的結果
test3的結果:
test4的結果
test5的結果
test6的結果
組合
concat用來連接類型一樣的兩個流。
如果細分的話,還有下面的這種分類
轉換
toArray方法將一個流轉換成數組,而如果想轉換成其它集合類型,需要調用collect方法,利用Collectors.toXXX方法進行轉換:
public static <T,C extends Collection<T>> Collector<T,?,C> toCollection(Supplier<C> collectionFactory) public static …… toConcurrentMap(……) public static <T> Collector<T,?,List<T>> toList() public static …… toMap(……) public static <T> Collector<T,?,Set<T>> toSet()并行流與串行流
并行流 就是把一個內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流。
Java 8 中將并行進行了優化,我們可以很容易的對數據進行并行操作。Stream API 可以聲明性地通過 parallel() 與sequential() 在并行流與順序流之間進行切換。
Fork/Join 框架
Fork/Join 框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行 join 匯總.
Fork/Join 框架與傳統線程池的區別:
采用 “工作竊取”模式(work-stealing):當執行新的任務時它可以將其拆分分成更小的任務執行,并將小任務加到線程隊列中,然后再從一個隨機線程的隊列中偷一個并把它放在自己的隊列中。
相對于一般的線程池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的線程池中,如果一個線程正在執行的任務由于某些原因無法繼續運行,那么該線程會處于等待狀態.而在fork/join框架實現中,如果某個子問題由于等待另外一個子問題的完成而無法繼續運行.那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行.這種方式減少了線程的等待時間,提高了性能
演示:
類ForkJoinCalculate
Test類
import org.junit.Test; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.stream.LongStream;public class MyTest {// Fork Join 框架@Testpublic void test1() {ForkJoinPool pool = new ForkJoinPool();ForkJoinTask<Long> task = new ForkJoinCalculate(0,1000000L);Long sum = pool.invoke(task);System.out.println(sum);}// Java并行流@Testpublic void test2() {LongStream.rangeClosed(0,1000000L).parallel() //并行流 // .sequential() 順序流.reduce(0,Long::sum);}}說點看法吧,這篇文章僅僅起到入門到使用的作用,Stream其實蘊涵著比較深的技術,深入理解還需要大量的實踐、探究與學習。
另外本文是基于《Java 8實戰》這本書的思考與學習的總結筆記,含少量內容的摘錄。
總結
以上是生活随笔為你收集整理的【Java 8 in Action】Stream的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 程序员幽默:当代程序员的主要矛盾是什么?
- 下一篇: java美元兑换,(Java实现) 美元