Showing posts with label Functional programming. Show all posts
Showing posts with label Functional programming. Show all posts

Wednesday, June 15, 2016

Usage of Java streams

In part1 of these series we saw basics of java stream.Now we discuss how to use the streams along with some important operations on it. Now let's see different ways to create streams.

Create streams from existing values:

There are two methods in stream interface to create stream from a single value and multiple values.

Stream stream = Stream.of("test");
Stream stream = Stream.of("test1", "test2", "test3", "test4");

Create empty stream :


Stream stream = Stream.empty();

Create Stream from function:

We can generate an infinite stream from a function that can produce infinite number of elements if required.There are two static methods iterate and generate in Stream interface to produce infinite stream.

 Stream iterate(T seed, UnaryOperator f)
 Stream generate(Supplier s);
The iterator() method takes two arguments: a seed and a function. The first argument is a seed that is the first element of the stream. The second element is generated by applying the function to the first element. The third element is generated by applying the function on the second element and so on.

The below example creates an infinite stream of natural numbers starting with 1.


Stream<Integer> naturalNumbers = Stream.iterate(1, n -> n + 1);
The generate(Supplier<T> s) method uses the specified Supplier to generate an infinite sequential unordered stream.Here Supplier is a functional interface, so we can use lambda expressions here.Lets see the below example to
generate an infinite stream of random numbers.Here we use method reference to generate random numbers.Please follow the series method reference( double colon perator) if you are not aware about it.

Stream.generate(Math::random).limit(5).forEach(System.out::println);

Create Stream from Collections:

Collection is the data-source we usually use for creating streams.The Collection interface contains the stream() and parallelStream() methods that create sequential and parallel streams from a Collection.

Example

Set nameSet = new HashSet<>();

//add some elements to the set

nameSet.add("name1");

nameSet.add("tes2");

//create a sequential stream from the nameSet

Stream sequentialStream = nameSet.stream();

// Create a parallel stream from the  nameSet

Stream parallelStream = nameSet.parallelStream(); 

Create Streams from Files:

Many methods are added to classes in java.io and java.nio.file  package in java 8 to facilitate IO operations by using streams.Let's see the example to read the content of the file using stream.

Path path = Paths.get(filePath);

Stream lines = Files.lines(path);

lines.forEach(System.out::println);
the method lines() added in Files class in java  1.8.Read all lines from a file as a Stream.

Stream Operations:

Now we will go through with some commonly used stream operations and their usage.
  1. Distinct
  2. filter
  3. flatMap
  4. limit
  5. map
  6. skip
  7. peek
  8. sorted
  9. allMatch
  10. anyMatch
  11. findAny
  12. findFirst
  13. noneMatch
  14. forEach
  15. reduce
Operations 1 to 8 are intermediate operations and 9 to 15 are terminal operations.  As some of the operations are self explanatory , so we discuss about those which are not trivial.

 

Map Operation:

                                                              

A map operation applies a function to each element of the input stream to produce another stream ( output stream ).The number of elements in the input and output streams are same. So this is a one to one mapping.The above figure shows the mapping.It take the element e1 and  apply function f on it to get f(e1) and so on.But  the type of elements in the  output stream may be different from  the type of elements in the input stream.Let's take an example.                                                                                                                         


Suppose  we have 1000 keys with values in redis data store and we want to fetch all the values of those keys and then we will perform some operation on them.We want to do it with future object,So how will we do it parallely with java  Stream.We will use thread pool service here to fetch the data from redis.Suppose our uniqueItemids List contains the list of keys.             

HashOperations redisHash=redisTemplate.opsForHash();

ExecutorService threadPoolService=Executors.newFixedThreadPool(10);

uniqueItemIds.

stream().

parallel().


map(itemId-> threadPoolService.submit(new Callable()) .forEach(future->{


try {


return future.get();


} catch (Exception e) {


e.printStackTrace();


return null;


}
Here the code in the callable's call method will be to fetch the data from redis with the specified item id.As we know the submit will return us the future object ,so map operation here takes an itemid which is of type long and return us an object of type future.Here I am emphasizing the point that  "the type of elements in the  output stream returned by the map operation may be different from the  type of elements in the input stream"

flatMap Operation:     

Unlike the map operation ,the Streams API  supports one-to-many mapping   through the flatMap.The mapping function takes an element from the input stream and maps the  element to a stream. The type of input element and the elements in the mapped    stream may be  different.This step produces a stream of streams.If the input stream is a Stream<T>  then the
mapped stream will be  Stream<Stream<R>> But which is  not   desired.Assume we have a map with below structure.

Map>> itemsMap = new ConcurrentHashMap<>()

//Now let's fill the map with some values.

itemsMap.put(2,  new ConcurrentHashMap<>());

itemsMap.put(3, new ConcurrentHashMap<>());

itemsMap.get(2).put(1L, Arrays.asList("abc","cde","def","rty"));

itemsMap.get(2).put(2L, Arrays.asList("2abc","2cde","2def","2rty"));

itemsMap.get(2).put(3L, Arrays.asList("3abc","3cde","3def","3rty"));

itemsMap.get(3).put(1L, Arrays.asList("abc3","cde3","def3","rty3"));

Now our aim is to get all the lists of strings in a stream.How can we achieve it?

A immediate solution comes to mind   is to write like below.

itemsMap.values().stream().parallel().map(m->m.values().stream()).forEach(System.out::println);

 Now we get the output as follows

java.util.stream.ReferencePipeline$Head@4eec7777
java.util.stream.ReferencePipeline$Head@3b07d329

We are expected to see list of strings in the output.But we don't find that.This is because of inside the map Stream of String is produced and we give Stream<Stream<String>> as the input to foreach.So we get the result.

Now our next attempt like this


itemsMap.

values().

stream().

parallel().

map(m->m.values().

stream()).

forEach(e->e.forEach(System.out::println));
And the output is   :

[abc, cde, def, rty]
[2abc, 2cde, 2def, 2rty]
[3abc, 3cde, 3def, 3rty]
[abc3, cde3, def3, rty3]

We are able to find all our Strings together but observe that they are still Stream<Stream<String>>  .Just we managed to write it in a different way in the for each loop.

The correct approach to our problem is


itemsMap.

values().

stream().

parallel().

flatMap(m->m.values().

stream()).

forEach(System.out::println);
So here comes the flatMap to the rescue.It flattens the Stream<Stream<String>>  and convert it into Stream<String>    .So make sure to use flatMap when you get Stream<Stream<T>>                     

We will discuss some other important operation in series 3.

Sunday, June 12, 2016

An itroduction to Java 8 Streams

As now a days CPUs are getting more and more cheaper because of huge amount  of development  in hardware front, Java8 exploits these features of multi core CPU and is in process of continuously  introducing more and more support for parallelism.As a result we are seeing new features like fork-join framework and java streams etc.Here we discuss what is java stream and it's usefulness in parallel processing.

There are 3 series of posts about java stream in my blog.Here we will cover in detail with real world examples of java stream.Also I have mentioned some the  code segments the same way as I have used in one of my projects.when I started to study about Java stream I was confused it with java inputstream and outputstream, but they are completely different.

In this series you will learn what is java stream and where to use it and different kind of stream operations.We will learn about sequential  and parallel stream and important operations like map,flatmap,reduce,collect etc. on it.

If you are not familiar with java 8 Lambda expression ,functional interface and method  reference I request to visit my series Java 8 Lambda expression first.
Let's start our discussion with java8 streams.

Aggregate operations:

But first let's define what is aggregate operations.
An aggregate operation computes a single value from a set of values.Common aggregate functions are:
  • Average() (i.e., arithmetic mean)
  • Count()
  • Maximum()
  • Median()
  • Minimum()
  • Mode()
  • Sum()
 Here observe that this functions are applied on a set of values and give us a single value.The result of aggregate functions may be an object or a primitive type or empty.

Stream:


A stream is a sequence of elements which supports  sequential and parallel  aggregate operations.Now let's discuss some of the features of stream.Here we use lambda expression extensively.So if you have no idea about Lambda expression Please visit the Lambda Expression series.

Features Of Stream:


  • 1.A stream has no storage; it does not store elements. A stream pulls elements from a data source on-demand(lazily) and passes them to a aggregate operation for processing.Now if you are thinking what is the source of the stream?The answer is it can be collection but not always.It can be any data source like a generator function or an I/O channel or a data structure.

  • 2.A stream can represent a group of infinite elements.A stream pulls its elements from a data source that can be a collection, a function that generates data, an I/O channel, etc. Because a function can generate an infinite number of elements and a stream can pull data from it on demand, it is possible to have a stream representing a sequence of infinite data elements.

  • 3.A stream support  internal iteration, so no need to use for each loop or an iterator for accessing elements.External iteration like for each loop or an iterator usually gives the elements in sequential or insertion order.That is only single thread can consume the elements.But always it is not desired.Stream is there to help us. They are designed to process their elements in parallel without our notice. It does not mean streams automatically decide on our behalf when to process  elements in sequential or parallel.We have to tell a stream that we want  parallel processing and the stream will take care of it. In background stream uses Fork/Join framework to achieve parallel processing.Although stream support internal iteration ,but still they provide an iterator() method that returns an Iterator to be used for external iteration of its elements if necessary,but rarely used.

  • 4.Two types of operations are supported by stream.We call it as lazy (intermediate) operation and eager(terminal) operation.When a eager operation is called on the stream ,then only lazy operation process the elements of the stream.Let's take a problem to solve using java stream.Suppose we have a list of 10 integers and we want the sum of the square of even integers. Let's see the digram below.We write the code following the below digram.

Datasource----------Stream---------Filter-----Stream -----map------Stream-------reduce

List numberList = Arrays.asList(1, 2, 3, 4, 5,6,7,8,9,10);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.map(n -> n * n)
.reduce(0, Integer::sum);


Here numberList is our data source.Then we apply stream on it,it gives us a stream.Then we apply Filter on it to find only even numbers.The filter operation provides us another stream of even numbers.Then we apply map on that stream.Then it gives us a stream of square of numbers.Then we apply reduce on this stream and find the sum of the numbers on the stream.
Here reduce is the terminal operation and filter and map are intermediate operations.This process in which one stream provides another stream and this stream provides a next stream etc. is called stream pipelining.

  • 5.Stream does not remove the elements from the data source  it only reads them.

  • 6.A sequential stream can be transformed into a parallel stream by calling the parallel() method on the created stream.

  • 7.We can find the stream related interfaces and classes are in the java.util.stream package.Please follow the digram for stream  related interfaces.

                                 AutoClosable
                                           |
                                           |
                                           |
BaseSteam   <T, S extends BaseStream<T, S>>
     |                           |                   |                        |
     |                           |                   |                        |
     |                           |                   |                        |
IntStream    LongStream    DoubleStream  Stream

To work with elements of reference type stream interface is there ,but to work elements of primitive types IntStream,LongStream and DoubleStream interfaces are available.

Methods common to all types of streams are declared in the BaseStream interface.Like sequential,parallel,isParallel and unordered etc.And methods related to intermediate and terminal operations  are declared in Stream interface.

In the second part of this series we will see the use of the stream operations with example.