Sunday, January 24, 2016

Break down package java.util.stream

Stream processing is significant feature of Java 8. In api level, it falls into package java.util.stream. By breaking down the package, you can get a whole view of how the stream api works.

Steam introduces map-reduce similar functions to Java. The article is based on Java 8.

1. Hierarchy

java_util_stream

The above only has most important components for understanding package java.util.stream.  4 builder interfaces corresponding to different stream type and 1 class for low-level library writers are not included.

There are only 1 class on above diagram, class Collectors, The rests are all interfaces. The most important are the 4 stream interfaces in pink. They can again be divided into 2 groups, streams for numbers and stream for other non-number object.  You can roughly think streams for number, IntStream/LongStream/DoubleStream, are special cases for Stream<T>, by set Generic Type T to Integer/Long/Double and add special methods for number manipulation like average(), sum().

All 4 streams has collect(supplier, accumulator, combiner) method. (supplier,accumlator,combiner will be described below). Method collect() provides the most general way to finally do the reduce. For number streams, in most cases, we don't need to use collect() at all, JDK provides shortcut methods, like average(),sum(),max(),min(), are good enough.

To simplify the usage of collect(supplier, accumulator, combiner) method, JDK extracts supplier, accumulator, combiner, together with finisher to the  interface Collector<T,A,R>. So reasonably there's an overload method collect(Collector<? super T,A,R> collector).

Furthemore, to make  collect(Collector<? super T,A,R> collector) easy to use, JDK provides the helper class Collectors with a bunch of static methods to create instance of Collector as input argument.

From the above diagram, we can see there's also reduce() methods for all stream, you can think them like simplified version of collect().

2. Stream methods

2.1 BaseStream functions

unordered(), make or just mark a stream is unordered. Usually it's used for parallel processing the stream. For example you have a stream from a list, which is ordered, but you only want to calculate the sum which has nothing to do with the order information, you can unordered the stream to make it can be processed in parallel.

sequential()/parallel(), change the stream to sequential or parallel.  Default stream is sequential for most cases, like the stream() return from a java collection.

2.2 Most used stream functions

For all example codes, suppose we have a variable users as list of User and  a stream userStream created from the list.

public class User {
  public static enum Gender {
    MALE, FEMALE
  }
 
  private Gender gender;
  private int age;
  private String username;
  
  // ignore getter,setter
}

List <User>  users;
// ignore initilization of users
Stream<User> userStream = users.stream();

filter(), screen out some element in the stream. For example

userStream.filtere(User u –> u.getAge() >= 18) //return stream with only adult users

will return a new stream with only adult users.

map(), can let you apply a given function on every element of the stream and create a new stream from that. For example

userStream().map(u->u.getAge());       //return Stream<Integer>
userStream().mapToInt(u->u.getAge());  //return IntStream

mapToInt() return a new IntStream which is convinient to process int elements.  There are also methods start with flatMap such as flatMap(),flatMapToInit(). The difference between map and flatMap is: map return only one output element for every one input element(Let's call it One-To-One), flatMap return multiple output for every single input(Let's call it One-To-Many).  for example:  

List&lg;string> lines = Arrays.asList("1 2 3 4", "5 6 7","8 9");
// every input string return multiple numbers
lines.stream().flatMap(line -> Arrays.asList(line.split("\\s+")).stream());

reduce(), will make a stream of type T finally return a single result of type T(let's call it Many-To-One). For example calcalate the sum of all users' age:

Optional<Integer> sum = userStream.map(u->u.getAge()).reduce((x,y)->x+y);
System.out.println("sum = "+sum.get());

When do reduce() on a Stream<Integer> return from map(), logically it will finally return a single Integer. In API level, a java.util.Optional<Integer> is used to contain that Integer.

Number streams has more methods special for number calculations, such as sum() or average(), so the previous example to calculate age sum can also be written like below:

int sum = stream.mapToInt(u->u.getAge()).sum();
System.out.println("sum = "+sum);

Also if we use the most generic method for reduction collect(), the previous age sum calculation can also be written like this:

int sum = userStream.collect(Collectors.summingInt(u -> u.getAge()));
System.out.println("sum = " + sum);

3. Collectors methods

The main purpose of class Collectors is to create Collector instance as input argument of stream's collect() method just like previous example. 

As name indicates, summing**() methods return Collector instance for producing the sum, averaging**() methods return Collector instance for producing arithmetic average. Here "**" can be Int, Long, and Double.

Methods like to**(), will return Collector that will return element in stream to a Java collection. Here "**" can be List, Set, Map,ConcurrentMap. The following example remove duplications from a list.

List<Integer> ages = Arrays.asList(25, 25, 30, 30, 35);  // list with duplication
List<Integer> distinctAges = ages.stream().distinct().collect(Collectors.toList()); // list without dup
System.out.println("distinctAges = " + Arrays.toString(distinctAges.toArray())); // [25,30,35]

Methods start like groupingBy() are very helpful.  They provide functions similar to "group by" in SQL.  There are several overload of groupingBy(), but mandatory parameter is the key to group, which will also be the key of the return Map.  For example, get a map of user grouped by gender, the key in the return Map is gender, the value is a user list of that gender.

// group users by gender
Map<Gender,List<User>> usersByGender = usreStream.collect(Collectors.groupingBy(u->u.getGender()));

If you want to calculate average user age for male and female respectively, see below.

Map<Gender, Double> averageAgeByGender = stream.collect(Collectors.groupingBy(User::getGender,
    Collectors.averagingDouble(User::getAge))); 
// print out  
averageAgeByGender.forEach((k,v)->System.out.printf("Average age of Gender %s is %f\n",k,v));

Methods partitioningBy(), can only group into 2 groups,  and key fixed to TRUE/FALSE,  so it can be thought as a simplified version of groupingBy(). 

4. Understand Collector

JDK provides class Collectors to make create Collector instance easily, so usually you don't play with Collector interface directly. But to better understand the stream process, you need at least know what a collector really does. 

Interface Collector<T,A,R> has 3 generic types:

  • T - the type of input elements to the reduction operation
  • A - the intermedia type of partial reduce result, most of time it's same as T, but as a application developer normally we don't care, so most of time "?" is used.
  • R - the result type of the reduction operation

The type T and R are more important for developers.

A collector has 4 main methods, they will be called internally in the collect() process. These 4 methods are:

  • creation of a new result container (A container = supplier().get())
  • incorporating a new data element into a result container (accumulator().accept(container, everyElement))
  • combining two result containers into one (combiner().apply(partialContainer1, partialContainer2))
  • performing an optional final transform on the container (finisher().apply(container))

The collector interface also provide a static method of(Supplier<A> supplier, BiConsumer<A,T> accumulator, BinaryOperator<A> combiner, Function<A,R> finisher, Collector.Characteristics... characteristics) to let you fully controll how to create a instance of your own collector.  Here's a example of creating Collector instance equivalent to Collectors.summingInt(), pay attention to how the supplier,accumulator,combiner and finisher are implemented.

// Can also defined as Collector<User,int[],Integer>
  Collector<User,?,Integer> myCollector; 
  
//myCollector = Collectors.summingInt(u->u.getAge());
  myCollector = Collector.of(
        () -> new int[1],                           //supplier
        (a, t) -> a[0] += t.getAge(),               //accumulator
        (a1, a2) -> {a1[0] += a2[0];return a1;},    //combiner
        a -> a[0]);                                 //finisher
    
  int sum = userStream.collect(myCollector);
  System.out.println("sum = " + sum);

5. Recap

To understand Java stream processing, we need to understand 4 stream interfaces(Stream<T>,IntStream,LongStream and DoubleStream), 1 Collecor interface and 1 Collectors class. Stream provide operations on every element in the stream, then reduce the element to final result. Besides many out-of-box functions for reduction, you can also use Collector and Collectors for more generic reduction operation.

0 comments:

Post a Comment

Powered by Blogger.

About The Author

My Photo

Has been a senior software developer, project manager for 10+ years. Dedicate himself to Alcatel-Lucent and China Telecom for delivering software solutions.

Pages

Unordered List