Youwang Deng

I'm a software developer, familiar with C#, Java, JavaScript, focus on full stack development.

Java Pipelines and Streams

17 Jun 2019 » Java, Data-Structure

Pipelines and Streams

A pipeline is a sequence of aggregate operations

Examples

public class Person {

    public enum Sex {
        MALE, FEMALE
    }

    String name;
    LocalDate birthday;
    Sex gender;
    String emailAddress;
    
    // ...

    public int getAge() {
        // ...
    }

    public String getName() {
        // ...
    }
}

The following example prints the male members contained in the collection roster with a pipeline that consists of the aggregate operations filter and forEach, compared with for-each code

//pipeline
roster
    .stream()
    .filter(e -> e.getGender() == Person.Sex.MALE)
    .forEach(e -> System.out.println(e.getName()));
//for-each loop
for (Person p : roster) {
    if (p.getGender() == Person.Sex.MALE) {
        System.out.println(p.getName());
    }
}

pipeline components

  • A source: This could be a collection, an array, a generator function, or an I/O channel.
  • Zero or more intermediate operations. An intermediate operation, such as filter, produces a new stream. A stream is a sequence of elements. Unlike a collection, it is not a data structure that stores elements. Instead, a stream carries values from a source through a pipeline. This example creates a stream from the collection roster by invoking the method stream.
  • A terminal operation. A terminal operation, such as forEach, produces a non-stream result.

Another Example

double average = roster
    .stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .mapToInt(Person::getAge)
    .average()
    .getAsDouble();

Differences Between Aggregate Operations and Iterators

  • Aggregate operations use internal iteration. With internal iteration, your application determines what collection it iterates, but the JDK determines how to iterate the collection. With external iteration, your application determines both what collection it iterates and how it iterates it. Internal iteration can more easily take advantage of parallel computing, which involves dividing a problem into subproblems, solving those problems simultaneously, and then combining the results of the solutions to the subproblems.
  • Aggregate operations process elements from a stream, it process elements from a stream, not directly from a collection.
  • Aggregate operations support behavior as parameters, you can specify lambda expressions as parameters for most aggregate operations.

Stream.reduce Method

Integer totalAgeReduce = roster
   .stream()
   .map(Person::getAge)
   .reduce(
       0,
       (a, b) -> a + b);

The reduce operation in this example takes two arguments:

  • identity: The identity element is both the initial value of the reduction and the default result if there are no elements in the stream.
  • accumulator: The accumulator function takes two parameters: a partial result of the reduction and the next element of the stream, such as (a, b) -> a + b