Pipelines and Streams
A pipeline is a sequence of aggregate operations
Examples
public class Person {
public enum Sex {
MALE, FEMALE
}
String name;
LocalDate birthday;
Sex gender;
String emailAddress;
// ...
public int getAge() {
// ...
}
public String getName() {
// ...
}
}
The following example prints the male members contained in the collection roster with a pipeline that consists of the aggregate operations filter and forEach, compared with for-each code
//pipeline
roster
.stream()
.filter(e -> e.getGender() == Person.Sex.MALE)
.forEach(e -> System.out.println(e.getName()));
//for-each loop
for (Person p : roster) {
if (p.getGender() == Person.Sex.MALE) {
System.out.println(p.getName());
}
}
pipeline components
- A source: This could be a
collection
,an array
, agenerator function
, or anI/O channel
. - Zero or more intermediate operations. An intermediate operation, such as filter, produces a new stream. A
stream
is a sequence of elements. Unlike a collection, it is not a data structure that stores elements. Instead, a stream carries values from a source through apipeline
. This example creates a stream from the collection roster by invoking the methodstream
. - A terminal operation. A terminal operation, such as forEach, produces a
non-stream
result.
Another Example
double average = roster
.stream()
.filter(p -> p.getGender() == Person.Sex.MALE)
.mapToInt(Person::getAge)
.average()
.getAsDouble();
Differences Between Aggregate Operations and Iterators
- Aggregate operations use internal iteration. With internal iteration, your application determines what collection it iterates, but the JDK determines how to iterate the collection. With external iteration, your application determines both what collection it iterates and how it iterates it. Internal iteration can more easily take advantage of
parallel computing
, which involves dividing a problem into subproblems, solving those problems simultaneously, and then combining the results of the solutions to the subproblems. - Aggregate operations process elements from a stream, it process elements from a stream, not directly from a collection.
- Aggregate operations support behavior as parameters, you can specify
lambda expressions
as parameters for most aggregate operations.
Stream.reduce Method
Integer totalAgeReduce = roster
.stream()
.map(Person::getAge)
.reduce(
0,
(a, b) -> a + b);
The reduce operation in this example takes two arguments:
identity
: The identity element is both the initial value of the reduction and the default result if there are no elements in the stream.accumulator
: The accumulator function takes two parameters: a partial result of the reduction and the next element of the stream, such as(a, b) -> a + b