基础篇:JAVA.Stream函数,优雅的数据流操作
发布日期:2021-05-09 01:45:21 浏览次数:20 分类:博客文章

本文共 15468 字,大约阅读时间需要 51 分钟。

前言

平时操作集合数据,我们一般都是for或者iterator去遍历,不是很好看。java提供了Stream的概念,它可以让我们把集合数据当做一个个元素在处理,并且提供多线程模式

  • 流的创建
  • 流的各种数据操作
  • 流的终止操作
  • 流的聚合处理
  • 并发流和CompletableFuture的配合使用

关注公众号,一起交流,微信搜一搜: 潜行前行

1 stream的构造方式

stream内置的构造方法

public static
Stream
iterate(final T seed, final UnaryOperator
f)public static
Stream
concat(Stream
a, Stream
b)public static
Builder
builder()public static
Stream
of(T t)public static
Stream
empty()public static
Stream
generate(Supplier
s)

Collection声明的stream函数

default Stream
stream()
  • Collection声明了stream转化函数,也就是说,任意Collection子类都存在官方替我们实现的由Collection转为Stream的方法
  • 示例,List转Stream
public static void main(String[] args){    List
demo = Arrays.asList("a","b","c"); long count = demo.stream().peek(System.out::println).count(); System.out.println(count);}-------result--------abc3

2 接口stream对元素的操作方法定义

过滤 filter

Stream
filter(Predicate
predicate)
  • Predicate是函数式接口,可以直接用lambda代替;如果有复杂的过滤逻辑,则用or、and、negate方法组合
  • 示例
List
demo = Arrays.asList("a", "b", "c");Predicate
f1 = item -> item.equals("a");Predicate
f2 = item -> item.equals("b");demo.stream().filter(f1.or(f2)).forEach(System.out::println);-------result--------ab

映射转化 map

Stream
map(Function
mapper)IntStream mapToInt(ToIntFunction
mapper);LongStream mapToLong(ToLongFunction
mapper);DoubleStream mapToDouble(ToDoubleFunction
mapper);
  • 示例
static class User{    public User(Integer id){this.id = id; }    Integer id; public Integer getId() {  return id; }}public static void main(String[] args) {    List
demo = Arrays.asList(new User(1), new User(2), new User(3)); // User 转为 Integer(id) demo.stream().map(User::getId).forEach(System.out::println);}-------result--------123

数据处理 peek

Stream
peek(Consumer
action);
  • 与map的区别是其无返回值
  • 示例
static class User{    public User(Integer id){this.id = id; }    Integer id;    public Integer getId() {  return id; }    public void setId(Integer id) {  this.id = id; }}public static void main(String[] args) {    List
demo = Arrays.asList(new User(1), new User(2), new User(3)); // id平方,User 转为 Integer(id) demo.stream().peek(user -> user.setId(user.id * user.id)).map(User::getId).forEach(System.out::println);}-------result--------149

映射撵平 flatMap

Stream
flatMap(Function
> mapper);IntStream flatMapToInt(Function
mapper);LongStream flatMapToLong(Function
mapper);DoubleStream flatMapToDouble(Function
mapper);
  • flatMap:将元素为Stream<T>类型的流撵平成一个元素类型为T的Stream流
  • 示例
public static void main(String[] args) {    List
> demo = Arrays.asList(Stream.of(5), Stream.of(2), Stream.of(1)); demo.stream().flatMap(Function.identity()).forEach(System.out::println);}-------result--------521

去重 distinct

Stream
distinct();
  • 示例
List
demo = Arrays.asList(1, 1, 2);demo.stream().distinct().forEach(System.out::println);-------result--------12

排序 sorted

Stream
sorted();Stream
sorted(Comparator
comparator);
  • 示例
List
demo = Arrays.asList(5, 1, 2);//默认升序demo.stream().sorted().forEach(System.out::println);//降序Comparator
comparator = Comparator.
comparing(item -> item).reversed();demo.stream().sorted(comparator).forEach(System.out::println);-------默认升序 result--------125-------降序 result--------521

个数限制limit和跳过skip

//截取前maxSize个元素Stream
limit(long maxSize);//跳过前n个流Stream
skip(long n);
  • 示例
List
demo = Arrays.asList(1, 2, 3, 4, 5, 6);//跳过前两个,然后限制截取两个demo.stream().skip(2).limit(2).forEach(System.out::println);-------result--------34

JDK9提供的新操作

  • 和filter的区别,takeWhile是取满足条件的元素,直到不满足为止;dropWhile是丢弃满足条件的元素,直到不满足为止
default Stream
takeWhile(Predicate
predicate);default Stream
dropWhile(Predicate
predicate);

3 stream的终止操作action

遍历消费

//遍历消费void forEach(Consumer
action);//顺序遍历消费,和forEach的区别是forEachOrdered在多线程parallelStream执行,其顺序也不会乱void forEachOrdered(Consumer
action);
  • 示例
List
demo = Arrays.asList(1, 2, 3);demo.parallelStream().forEach(System.out::println);demo.parallelStream().forEachOrdered(System.out::println);-------forEach result--------231-------forEachOrdered result--------123

获取数组结果

//流转成Object数组Object[] toArray();//流转成A[]数组,指定类型A A[] toArray(IntFunction
generator)
  • 示例
List
demo = Arrays.asList("1", "2", "3");//
A[] toArray(IntFunction
generator)String[] data = demo.stream().toArray(String[]::new);

最大最小值

//获取最小值Optional
min(Comparator
comparator)//获取最大值Optional
max(Comparator
comparator)
  • 示例
List
demo = Arrays.asList(1, 2, 3);Optional
min = demo.stream().min(Comparator.comparing(item->item));Optional
max = demo.stream().max(Comparator.comparing(item->item));System.out.println(min.get()+"-"+max.get());-------result--------1-3

查找匹配

//任意一个匹配boolean anyMatch(Predicate
predicate)//全部匹配boolean allMatch(Predicate
predicate)//不匹配 boolean noneMatch(Predicate
predicate)//查找第一个Optional
findFirst();//任意一个Optional
findAny();

归约合并

//两两合并Optional
reduce(BinaryOperator
accumulator)//两两合并,带初始值的T reduce(T identity, BinaryOperator
accumulator)//先转化元素类型再两两合并,带初始值的
U reduce(U identity, BiFunction
accumulator, BinaryOperator
combiner)
  • 示例
List
demo = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);//数字转化为字符串,然后使用“-”拼接起来String data = demo.stream().reduce("0", (u, t) -> u + "-" + t, (s1, s2) -> s1 + "-" + s2);System.out.println(data);-------result--------0-1-2-3-4-5-6-7-8

计算元素个数

long count()
  • 示例
List
demo = Arrays.asList(1, 2, 3, 4, 5, 6);System.out.println(demo.stream().count());-------result--------6

对流的聚合处理

/** * supplier:返回结果类型的生产者 * accumulator:元素消费者(处理并加入R) * combiner: 返回结果 R 怎么组合(多线程执行时,会产生多个返回值R,需要合并) */
R collect(Supplier
supplier, BiConsumer
accumulator, BiConsumer
combiner);/** * collector一般是由 supplier、accumulator、combiner、finisher、characteristics组合成的聚合类 * Collectors 可提供一些内置的聚合类或者方法 */
R collect(Collector
collector);
  • 示例,看下面

4 Collector(聚合类)的工具类集Collectors

接口Collector和实现类CollectorImpl

//返回值类型的生产者Supplier supplier();//流元素消费者BiConsumer
accumulator();//返回值合并器(多个线程操作时,会产生多个返回值,需要合并)BinaryOperator
combiner();//返回值转化器(最后一步处理,实际返回结果,一般原样返回)Function
finisher();//流的特性Set
characteristics();public static
Collector
of(Supplier
supplier, BiConsumer
accumulator, BinaryOperator
combiner, Function
finisher, Characteristics... characteristics)

流聚合转换成List, Set

//流转化成Listpublic static 
Collector
> toList()//流转化成Setpublic static
Collector
> toSet()
  • 示例
List
demo = Arrays.asList(1, 2, 3);List
col = demo.stream().collect(Collectors.toList());Set
set = demo.stream().collect(Collectors.toSet());

流聚合转化成Map

//流转化成Mappublic static 
Collector
> toMap( Function
keyMapper, Function
valueMapper)/** * mergeFunction:相同的key,值怎么合并 */public static
Collector
> toMap( Function
keyMapper, Function
valueMapper, BinaryOperator
mergeFunction)/** * mergeFunction:相同的key,值怎么合并 * mapSupplier:返回值Map的生产者 */public static
> Collector
toMap( Function
keyMapper, Function
valueMapper, BinaryOperator
mergeFunction, Supplier
mapSupplier)
  • 如果存在相同key的元素,会报错;或者使用groupBy
  • 示例
List
demo = Arrays.asList(new User(1), new User(2), new User(3));Map
map = demo.stream().collect(Collectors.toMap(User::getId,item->item));System.out.println(map);-------result-------{1=TestS$User@7b23ec81, 2=TestS$User@6acbcfc0, 3=TestS$User@5f184fc6}

字符串流聚合拼接

//多个字符串拼接成一个字符串public static Collector
joining();//多个字符串拼接成一个字符串(指定分隔符)public static Collector
joining(CharSequence delimiter)
  • 示例
List
demo = Arrays.asList("c", "s", "c","w","潜行前行");String name = demo.stream().collect(Collectors.joining("-"));System.out.println(name);-------result-------c-s-c-w-潜行前行

映射处理再聚合流

  • 相当于先map再collect
/** * mapper:映射处理器 * downstream:映射处理后需要再次聚合处理 */public static 
Collector
mapping(Function
mapper, Collector
downstream);
  • 示例
List
demo = Arrays.asList("1", "2", "3");List
data = demo.stream().collect(Collectors.mapping(Integer::valueOf, Collectors.toList()));System.out.println(data);-------result-------[1, 2, 3]

聚合后再转换结果

/** * downstream:聚合处理 * finisher:结果转换处理 */public static
Collector
collectingAndThen(Collector
downstream, Function
finisher);
  • 示例
List
demo = Arrays.asList(1, 2, 3, 4, 5, 6);//聚合成List,最后提取数组的size作为返回值Integer size = demo.stream().collect(Collectors.collectingAndThen(Collectors.toList(), List::size));System.out.println(size);---------result----------6

流分组(Map是HashMap)

/** * classifier 指定T类型某一属性作为Key值分组 * 分组后,使用List作为每个流的容器 */public static 
Collector
>> groupingBy( Function
classifier); /** * classifier: 流分组器 * downstream: 每组流的聚合处理器 */public static
Collector
> groupingBy( Function
classifier, Collector
downstream)/** * classifier: 流分组器 * mapFactory: 返回值map的工厂(Map的子类) * downstream: 每组流的聚合处理器 */public static
> Collector
groupingBy( Function
classifier, Supplier
mapFactory, Collector
downstream)
  • 示例
public static void main(String[] args) throws Exception {    List
demo = Stream.iterate(0, item -> item + 1) .limit(15) .collect(Collectors.toList()); // 分成三组,并且每组元素转化为String类型 Map
> map = demo.stream() .collect(Collectors.groupingBy(item -> item % 3, HashMap::new, Collectors.mapping(String::valueOf, Collectors.toList()))); System.out.println(map);}---------result---------- {0=[0, 3, 6, 9, 12], 1=[1, 4, 7, 10, 13], 2=[2, 5, 8, 11, 14]}

流分组(分组使用的Map是ConcurrentHashMap)

/** * classifier: 分组器 ; 分组后,使用List作为每个流的容器 */public static 
Collector
>> groupingByConcurrent( Function
classifier);/** * classifier: 分组器 * downstream: 流的聚合处理器 */public static
Collector
> groupingByConcurrent( Function
classifier, Collector
downstream)/** * classifier: 分组器 * mapFactory: 返回值类型map的生产工厂(ConcurrentMap的子类) * downstream: 流的聚合处理器 */public static
> Collector
groupingByConcurrent( Function
classifier, Supplier
mapFactory, Collector
downstream);
  • 用法和groupingBy一样

拆分流,一变二(相当于特殊的groupingBy)

public static 
Collector
>> partitioningBy( Predicate
predicate)/** * predicate: 二分器 * downstream: 流的聚合处理器 */public static
Collector
> partitioningBy( Predicate
predicate, Collector
downstream)
  • 示例
List
demo = Arrays.asList(1, 2,3,4, 5,6);// 奇数偶数分组Map
> map = demo.stream() .collect(Collectors.partitioningBy(item -> item % 2 == 0));System.out.println(map);---------result----------{false=[1, 3, 5], true=[2, 4, 6]}

聚合求平均值

// 返回Double类型public static 
Collector
averagingDouble(ToDoubleFunction
mapper)// 返回Long 类型public static
Collector
averagingLong(ToLongFunction
mapper)//返回Int 类型public static
Collector
averagingInt(ToIntFunction
mapper)
  • 示例
List
demo = Arrays.asList(1, 2, 5);Double data = demo.stream().collect(Collectors.averagingInt(Integer::intValue));System.out.println(data);---------result----------2.6666666666666665

流聚合查找最大最小值

//最小值public static 
Collector
> minBy(Comparator
comparator) //最大值public static
Collector
> maxBy(Comparator
comparator)
  • 示例
List
demo = Arrays.asList(1, 2, 5);Optional
min = demo.stream().collect(Collectors.minBy(Comparator.comparing(item -> item)));Optional
max = demo.stream().collect(Collectors.maxBy(Comparator.comparing(item -> item)));System.out.println(min.get()+"-"+max.get());---------result----------1-5

聚合计算统计结果

  • 可以获得元素总个数,元素累计总和,最小值,最大值,平均值
//返回Int 类型public static 
Collector
summarizingInt( ToIntFunction
mapper)//返回Double 类型public static
Collector
summarizingDouble( ToDoubleFunction
mapper)//返回Long 类型public static
Collector
summarizingLong( ToLongFunction
mapper)
  • 示例
List
demo = Arrays.asList(1, 2, 5);IntSummaryStatistics data = demo.stream().collect(Collectors.summarizingInt(Integer::intValue));System.out.println(data);---------result----------IntSummaryStatistics{count=3, sum=8, min=1, average=2.666667, max=5}

JDK12提供的新聚合方法

//流分别经过downstream1、downstream2聚合处理,再合并两聚合结果public static 
Collector
teeing( Collector
downstream1, Collector
downstream2, BiFunction
merger)

5 并发paralleStream的使用

  • 配合CompletableFuture和线程池的使用
  • 示例
public static void main(String[] args)  throws Exception{    List
demo = Stream.iterate(0, item -> item + 1) .limit(5) .collect(Collectors.toList()); //示例1 Stopwatch stopwatch = Stopwatch.createStarted(Ticker.systemTicker()); demo.stream().forEach(item -> { try { Thread.sleep(500); System.out.println("示例1-"+Thread.currentThread().getName()); } catch (Exception e) { } }); System.out.println("示例1-"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); //示例2, 注意需要ForkJoinPool,parallelStream才会使用executor指定的线程,否则还是用默认的 ForkJoinPool.commonPool() ExecutorService executor = new ForkJoinPool(10); stopwatch.reset(); stopwatch.start(); CompletableFuture.runAsync(() -> demo.parallelStream().forEach(item -> { try { Thread.sleep(1000); System.out.println("示例2-" + Thread.currentThread().getName()); } catch (Exception e) { } }), executor).join(); System.out.println("示例2-"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); //示例3 stopwatch.reset(); stopwatch.start(); demo.parallelStream().forEach(item -> { try { Thread.sleep(1000); System.out.println("示例3-"+Thread.currentThread().getName()); } catch (Exception e) { } }); System.out.println("示例3-"+stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); executor.shutdown();}
  • -------------------result--------------------------
示例1-main示例1-main示例1-main示例1-main示例1-main示例1-2501示例2-ForkJoinPool-1-worker-19示例2-ForkJoinPool-1-worker-9示例2-ForkJoinPool-1-worker-5示例2-ForkJoinPool-1-worker-27示例2-ForkJoinPool-1-worker-23示例2-1004示例3-main示例3-ForkJoinPool.commonPool-worker-5示例3-ForkJoinPool.commonPool-worker-7示例3-ForkJoinPool.commonPool-worker-9示例3-ForkJoinPool.commonPool-worker-3示例3-1001
  • parallelStream的方法确实会使用多线程去运行,并且可以指定线程池,不过自定义线程必须是ForkJoinPool类型,否则会默认使ForkJoinPool.commonPool()的线程
上一篇:基础篇:JAVA原子组件和同步组件
下一篇:基础篇:异步编程不会?我教你啊!CompletableFuture

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2025年04月18日 00时02分28秒