Monday, June 27, 2016

operations on Java Streams -- continued

Filter Operation:

We can apply filter operation in an input stream  to produce another filtered stream.Suppose we have a finite stream of natural numbers but we want to filter the even numbers only,so we can apply filter operation here.Please note that ,unlike the map operation the elements in the filtered stream are  of the same type as the elements in the input stream.

The filter operation takes a functional interface Predicate as it's argument.Since the Predicate interface has a public method test which returns a boolean value, so we can pass a Lambda expression here as the argument to filter operation, which evaluates to a boolean value. 


 The size of the input stream is less than or equal to the size of the output stream.Please refer the below example.


Stream.of(1, 2, 3, 4, 5,6).filter(n->n%2==0).forEach(System.out::println);

Reduce Operation:

This combines all elements of a stream to generate a single result by applying a combining function repeatedly.Computing the sum, maximum, average, count etc.  are examples of the reduce operation.

The reduce operation takes two parameters  an initial value and an accumulator. The accumulator is the combining function. If the stream is empty, the initial value is the result. 


The initial value and an element are passed to the accumulator, which returns a partial result. This repeats until all elements in the stream are finished. The last value returned from the accumulator is the result of the reduce operation. 

 The Stream interface contains a reduce() method to perform the reduce operation. The method has three overloaded versions:  

1.Let's take the example for the first one

T reduce(T identity, BinaryOperator accumulator)


List<integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
int sum = numbers.stream()
.reduce(0, Integer::sum);
System.out.println(sum);

Here notice that 0 is the initial value and  Integer::sum is the accumulator ie. the combining function.

2.Let's take example for the second one

U reduce(U identity, BiFunction accumulator,BinaryOperator combiner) 

Note that the second argument, which is the accumulator, takes an argument whose type may be different from the type of the stream. This is used for the accumulating the partial results. The third argument is used for combining the partial results when the reduce operation is performed in parallel.Then the result of all the different threads will be combined.But if we are not doing it in parallel , the combiner  has no use. 


int result = List<Employee>
.stream()
.reduce(0, (intermediteSum, employee) ->intermediateSum + employee.getSalary(), Integer::sum);
System.out.println(sum);


The above code shows how to calculate the sum of salary of all  the   employees by using the reduce operation.Here 0 is the initial value  and sec
ond argument is the accumulator and Integer::sum is the combiner

3.Let's take the example for the third one

Optional reduce(BinaryOperator accumulator)

Sometimes we cannot specify an initial value for a reduce operation.Let's assume we get a list of numbers on the fly.We have no idea whether the list is empty or it has sum elements and we want to get maximum integer
value from a the List of numbers. If the underlaying stream is empty, we cannot initialize the maximum value. In such a case, the result is not defined.This version of the reduce method returns an Optional object that contains the result. If  the stream contains only one element, that element is the result. 

The following snippet of code computes the maximum of integers in a stream:


Optional<integer> maxValue = Stream.of(1, 2, 3, 4, 5)
.reduce(Integer::max);
if (maxValue.isPresent()) {
System.out.println("max = " + maxValue.get());
}
else {
System.out.println("max is not available.");
}

Collect Operation:

We saw in case of reduce operation we get a single value as the result.But sometimes we want to collect a set of values as the result of stream pipeline operations.

Let's take an example.We have a map of users having user name as key and user Account No as value.This map contains all the users those are active and non active.And we have another List of names which contains only active users.And our requirement is to get all the active  Account numbers.So my result here will be  itself a List of active Account numbers.


Set<string> activeUserList= new HashSet<>();
Map<String,String> completeUserMap=new HashMap<>();

List<String> keys =completeUserMap.entrySet().stream().
filter(e->activeUserList.contains(e.getKey())).
map(Map.Entry::getKey).collect(Collectors.toList());


Here notice that first we are filtering completeUserMap with activeUserList and then using the map operation to get user Account number from the Map entry and then collecting the result in a List.

Here let's collect the same result in a map,ie. we will collect the map of active users and their Account numbers.


Map<String,String> values = completeUserMap.entrySet().stream().
filter(e->activeUserList.contains(e.getKey())).
collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));


We will learn  about  Collectors,parallel stream and operation reordering in details in our next series.

Wednesday, June 15, 2016

Usage of Java streams

In part1 of these series we saw basics of java stream.Now we discuss how to use the streams along with some important operations on it. Now let's see different ways to create streams.

Create streams from existing values:

There are two methods in stream interface to create stream from a single value and multiple values.

Stream stream = Stream.of("test");
Stream stream = Stream.of("test1", "test2", "test3", "test4");

Create empty stream :


Stream stream = Stream.empty();

Create Stream from function:

We can generate an infinite stream from a function that can produce infinite number of elements if required.There are two static methods iterate and generate in Stream interface to produce infinite stream.

 Stream iterate(T seed, UnaryOperator f)
 Stream generate(Supplier s);
The iterator() method takes two arguments: a seed and a function. The first argument is a seed that is the first element of the stream. The second element is generated by applying the function to the first element. The third element is generated by applying the function on the second element and so on.

The below example creates an infinite stream of natural numbers starting with 1.


Stream<Integer> naturalNumbers = Stream.iterate(1, n -> n + 1);
The generate(Supplier<T> s) method uses the specified Supplier to generate an infinite sequential unordered stream.Here Supplier is a functional interface, so we can use lambda expressions here.Lets see the below example to
generate an infinite stream of random numbers.Here we use method reference to generate random numbers.Please follow the series method reference( double colon perator) if you are not aware about it.

Stream.generate(Math::random).limit(5).forEach(System.out::println);

Create Stream from Collections:

Collection is the data-source we usually use for creating streams.The Collection interface contains the stream() and parallelStream() methods that create sequential and parallel streams from a Collection.

Example

Set nameSet = new HashSet<>();

//add some elements to the set

nameSet.add("name1");

nameSet.add("tes2");

//create a sequential stream from the nameSet

Stream sequentialStream = nameSet.stream();

// Create a parallel stream from the  nameSet

Stream parallelStream = nameSet.parallelStream(); 

Create Streams from Files:

Many methods are added to classes in java.io and java.nio.file  package in java 8 to facilitate IO operations by using streams.Let's see the example to read the content of the file using stream.

Path path = Paths.get(filePath);

Stream lines = Files.lines(path);

lines.forEach(System.out::println);
the method lines() added in Files class in java  1.8.Read all lines from a file as a Stream.

Stream Operations:

Now we will go through with some commonly used stream operations and their usage.
  1. Distinct
  2. filter
  3. flatMap
  4. limit
  5. map
  6. skip
  7. peek
  8. sorted
  9. allMatch
  10. anyMatch
  11. findAny
  12. findFirst
  13. noneMatch
  14. forEach
  15. reduce
Operations 1 to 8 are intermediate operations and 9 to 15 are terminal operations.  As some of the operations are self explanatory , so we discuss about those which are not trivial.

 

Map Operation:

                                                              

A map operation applies a function to each element of the input stream to produce another stream ( output stream ).The number of elements in the input and output streams are same. So this is a one to one mapping.The above figure shows the mapping.It take the element e1 and  apply function f on it to get f(e1) and so on.But  the type of elements in the  output stream may be different from  the type of elements in the input stream.Let's take an example.                                                                                                                         


Suppose  we have 1000 keys with values in redis data store and we want to fetch all the values of those keys and then we will perform some operation on them.We want to do it with future object,So how will we do it parallely with java  Stream.We will use thread pool service here to fetch the data from redis.Suppose our uniqueItemids List contains the list of keys.             

HashOperations redisHash=redisTemplate.opsForHash();

ExecutorService threadPoolService=Executors.newFixedThreadPool(10);

uniqueItemIds.

stream().

parallel().


map(itemId-> threadPoolService.submit(new Callable()) .forEach(future->{


try {


return future.get();


} catch (Exception e) {


e.printStackTrace();


return null;


}
Here the code in the callable's call method will be to fetch the data from redis with the specified item id.As we know the submit will return us the future object ,so map operation here takes an itemid which is of type long and return us an object of type future.Here I am emphasizing the point that  "the type of elements in the  output stream returned by the map operation may be different from the  type of elements in the input stream"

flatMap Operation:     

Unlike the map operation ,the Streams API  supports one-to-many mapping   through the flatMap.The mapping function takes an element from the input stream and maps the  element to a stream. The type of input element and the elements in the mapped    stream may be  different.This step produces a stream of streams.If the input stream is a Stream<T>  then the
mapped stream will be  Stream<Stream<R>> But which is  not   desired.Assume we have a map with below structure.

Map>> itemsMap = new ConcurrentHashMap<>()

//Now let's fill the map with some values.

itemsMap.put(2,  new ConcurrentHashMap<>());

itemsMap.put(3, new ConcurrentHashMap<>());

itemsMap.get(2).put(1L, Arrays.asList("abc","cde","def","rty"));

itemsMap.get(2).put(2L, Arrays.asList("2abc","2cde","2def","2rty"));

itemsMap.get(2).put(3L, Arrays.asList("3abc","3cde","3def","3rty"));

itemsMap.get(3).put(1L, Arrays.asList("abc3","cde3","def3","rty3"));

Now our aim is to get all the lists of strings in a stream.How can we achieve it?

A immediate solution comes to mind   is to write like below.

itemsMap.values().stream().parallel().map(m->m.values().stream()).forEach(System.out::println);

 Now we get the output as follows

java.util.stream.ReferencePipeline$Head@4eec7777
java.util.stream.ReferencePipeline$Head@3b07d329

We are expected to see list of strings in the output.But we don't find that.This is because of inside the map Stream of String is produced and we give Stream<Stream<String>> as the input to foreach.So we get the result.

Now our next attempt like this


itemsMap.

values().

stream().

parallel().

map(m->m.values().

stream()).

forEach(e->e.forEach(System.out::println));
And the output is   :

[abc, cde, def, rty]
[2abc, 2cde, 2def, 2rty]
[3abc, 3cde, 3def, 3rty]
[abc3, cde3, def3, rty3]

We are able to find all our Strings together but observe that they are still Stream<Stream<String>>  .Just we managed to write it in a different way in the for each loop.

The correct approach to our problem is


itemsMap.

values().

stream().

parallel().

flatMap(m->m.values().

stream()).

forEach(System.out::println);
So here comes the flatMap to the rescue.It flattens the Stream<Stream<String>>  and convert it into Stream<String>    .So make sure to use flatMap when you get Stream<Stream<T>>                     

We will discuss some other important operation in series 3.

Monday, June 13, 2016

Java URL Connection Timeout (http,ftp,scp etc) setting in system level

Sometimes we face issues like the thread , which trying to connect the url ,hangs for infinite time. The connection may be over http,ftp or scp protocol.But really it is painful to debug the issue.But there are some system level configuration provided by java, so that we can  we can solve this problem.

So lets start with some simple definitions.

ConnectionTimeOut:


The timeout (in milliseconds) to establish the connection to the host.For example for http connections it is the timeout when establishing the connection to the http server. For ftp connection it is the timeout when establishing the connection to ftp servers.For scp connection it is the time out for establishing the scp connection.

The property provided by sun for connectionTimeOut is

sun.net.client.defaultConnectTimeout (default: -1)

Note that here -1 means infinite timeout.

ReadTimeOut:


The timeout (in milliseconds) when reading from input stream when a connection is established to a resource.It is the timeout between two consecutive packets from the socket.

The property provided by sun for readTimeOut is

sun.net.client.defaultReadTimeout (default: -1)


Retry If Post Fails:


It determines if an unsuccessful HTTP POST request will be automatically resent to the server. Unsuccessful post means  in this case  the server did not send a valid HTTP response or an IOException occurred.And it defaults to true in system level.

The property provided by sun for retry post fails is  
sun.net.http.retryPost (default: true)
 
 
 We can use it in system level to configure a global connection time out or  read time out setting.We can give the time out in the client , which is used to make the http call.For example apache http client.But it is important to note  that , these are sun implementation specific properties and these properties may not be supported in future releases.

We can set the property like

-Dsun.net.client.defaultReadTimeout =timeinmiliseconds
-Dsun.net.client.defaultReadTimeout =timeinmiliseconds
-Dsun.net.http.retryPost =false

For more details please follow the oracle doc.

Sunday, June 12, 2016

An itroduction to Java 8 Streams

As now a days CPUs are getting more and more cheaper because of huge amount  of development  in hardware front, Java8 exploits these features of multi core CPU and is in process of continuously  introducing more and more support for parallelism.As a result we are seeing new features like fork-join framework and java streams etc.Here we discuss what is java stream and it's usefulness in parallel processing.

There are 3 series of posts about java stream in my blog.Here we will cover in detail with real world examples of java stream.Also I have mentioned some the  code segments the same way as I have used in one of my projects.when I started to study about Java stream I was confused it with java inputstream and outputstream, but they are completely different.

In this series you will learn what is java stream and where to use it and different kind of stream operations.We will learn about sequential  and parallel stream and important operations like map,flatmap,reduce,collect etc. on it.

If you are not familiar with java 8 Lambda expression ,functional interface and method  reference I request to visit my series Java 8 Lambda expression first.
Let's start our discussion with java8 streams.

Aggregate operations:

But first let's define what is aggregate operations.
An aggregate operation computes a single value from a set of values.Common aggregate functions are:
  • Average() (i.e., arithmetic mean)
  • Count()
  • Maximum()
  • Median()
  • Minimum()
  • Mode()
  • Sum()
 Here observe that this functions are applied on a set of values and give us a single value.The result of aggregate functions may be an object or a primitive type or empty.

Stream:


A stream is a sequence of elements which supports  sequential and parallel  aggregate operations.Now let's discuss some of the features of stream.Here we use lambda expression extensively.So if you have no idea about Lambda expression Please visit the Lambda Expression series.

Features Of Stream:


  • 1.A stream has no storage; it does not store elements. A stream pulls elements from a data source on-demand(lazily) and passes them to a aggregate operation for processing.Now if you are thinking what is the source of the stream?The answer is it can be collection but not always.It can be any data source like a generator function or an I/O channel or a data structure.

  • 2.A stream can represent a group of infinite elements.A stream pulls its elements from a data source that can be a collection, a function that generates data, an I/O channel, etc. Because a function can generate an infinite number of elements and a stream can pull data from it on demand, it is possible to have a stream representing a sequence of infinite data elements.

  • 3.A stream support  internal iteration, so no need to use for each loop or an iterator for accessing elements.External iteration like for each loop or an iterator usually gives the elements in sequential or insertion order.That is only single thread can consume the elements.But always it is not desired.Stream is there to help us. They are designed to process their elements in parallel without our notice. It does not mean streams automatically decide on our behalf when to process  elements in sequential or parallel.We have to tell a stream that we want  parallel processing and the stream will take care of it. In background stream uses Fork/Join framework to achieve parallel processing.Although stream support internal iteration ,but still they provide an iterator() method that returns an Iterator to be used for external iteration of its elements if necessary,but rarely used.

  • 4.Two types of operations are supported by stream.We call it as lazy (intermediate) operation and eager(terminal) operation.When a eager operation is called on the stream ,then only lazy operation process the elements of the stream.Let's take a problem to solve using java stream.Suppose we have a list of 10 integers and we want the sum of the square of even integers. Let's see the digram below.We write the code following the below digram.

Datasource----------Stream---------Filter-----Stream -----map------Stream-------reduce

List numberList = Arrays.asList(1, 2, 3, 4, 5,6,7,8,9,10);
int sum = numbers.stream()
.filter(n -> n % 2 == 0)
.map(n -> n * n)
.reduce(0, Integer::sum);


Here numberList is our data source.Then we apply stream on it,it gives us a stream.Then we apply Filter on it to find only even numbers.The filter operation provides us another stream of even numbers.Then we apply map on that stream.Then it gives us a stream of square of numbers.Then we apply reduce on this stream and find the sum of the numbers on the stream.
Here reduce is the terminal operation and filter and map are intermediate operations.This process in which one stream provides another stream and this stream provides a next stream etc. is called stream pipelining.

  • 5.Stream does not remove the elements from the data source  it only reads them.

  • 6.A sequential stream can be transformed into a parallel stream by calling the parallel() method on the created stream.

  • 7.We can find the stream related interfaces and classes are in the java.util.stream package.Please follow the digram for stream  related interfaces.

                                 AutoClosable
                                           |
                                           |
                                           |
BaseSteam   <T, S extends BaseStream<T, S>>
     |                           |                   |                        |
     |                           |                   |                        |
     |                           |                   |                        |
IntStream    LongStream    DoubleStream  Stream

To work with elements of reference type stream interface is there ,but to work elements of primitive types IntStream,LongStream and DoubleStream interfaces are available.

Methods common to all types of streams are declared in the BaseStream interface.Like sequential,parallel,isParallel and unordered etc.And methods related to intermediate and terminal operations  are declared in Stream interface.

In the second part of this series we will see the use of the stream operations with example.










Saturday, June 4, 2016

NIO.2 Asynchronous file I/O with Future and CompletionHandler

Recently I had a requirement to work with lot of I/O type of work and at the same time a lot of computation.So what Immediately a solution comes to mind that we will start a configurable number of threads.And the job of each thread will do the I/O independently and then after start the computation.But here one thing to observe that my computation has a very little to do with  the I/O.But in this design  my thread is not doing any  useful when it is doing the I/O and after the completion of the I/O the thread is going for computation.But it would be great if my thread can be free once it starts the I/O and without waiting to complete the I/O  jumps to the computation part.And some one  informs my thread once the I/O  completes.Till that my thread is busy with doing some useful calculation.

So here we get the two benefits.My thread is not waiting for I/O to complete and at the same time , it is doing some useful calculation.Here note that the job of my thread is both I/O bound and CPU bound.

Asynchronous  I/O :


NIO.2 provides support for  asynchronous  I/O(connecting, reading, and writing). In a synchronous  I/O, the thread that requests the I/O operation waits until the I/O operation  completes.In an asynchronous  I/O, the  application requests the system for an I/O operation and the operation is performed by the system asynchronously. When the system is performing the  I/O operation, the application continues doing some other useful computation  work. When the system finishes the  I/O, it notifies the application about the completion of I/O operation.


Four asynchronous channels are added in NIO.2 (java 7) to the java.nio.channels package:

  •     AsynchronousSocketChannel
  •     AsynchronousServerSocketChannel
  •     AsynchronousFileChannel
  •     AsynchronousDatagramChannel
       
Here  we take AsynchronousFileChannel  as our example and try to understand the asynchronous I/O.

The AsynchronousFileChannel provides us two different ways for monitoring and controlling the initiated asynchronous operations.

 The first one is by returning a java.util.concurrent.Future object, which poses a Future object and can be used to enquire its state and obtain the result.It follows a poll type approach.

The second is by passing to the  I/O operation an object of a new class, java.nio.channels.CompletionHandler, which defines handler methods that are executed after the operation is completed.It follows a push type approach.

Each method of the AsynchronousFileChannel class that supports asynchronous file I/O operation has two versions.One for Future object and another for CompletionHandler object.

Example of poll approach using Future object:



package com.brainatjava.test;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.WRITE;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Future;

public class AshyncronousIOWithFuture {
static String str="write some meaning full text to file,which is desired for your applications.";
        public static void main(String[] args) {
        long startPosition=0;
        Path path = Paths.get("/home/brainatjava/mytest");
        try (AsynchronousFileChannel asyncFileChannel =
         AsynchronousFileChannel.open(path, WRITE, CREATE)) {
        ByteBuffer dataBuffer = ByteBuffer.wrap(str.getBytes());
        Future result = asyncFileChannel.write(dataBuffer, startPosition);
        while (!result.isDone()) {
        try {
        //remember in real life scenario the initiating thread will not sleep but it will  do some useful work.
        System.out.println("Sleeping for one seconds before the next pooling.We will continue to keep pooling in each one second.");
        Thread.sleep(1000);
        }
        catch (InterruptedException e) {
        e.printStackTrace();
        }
        }
       
        System.out.println("Now I/O operation is complete and we are going to get the result.");
        try {
        int resultbytewritten = result.get();
        System.out.format("%s bytes written to %s%n",
        resultbytewritten, path.toAbsolutePath());
        }
        catch (Exception e) {
        e.printStackTrace();
        }
        }
        catch (IOException e) {
        e.printStackTrace();
        }
        }
        }

In the example above first we create an AsynchronousFileChannel for writting. Then we use the write method to write some data,which return a Future object. Once we get a Future object, we  use a polling method method to handle the result of the asynchronous file I/O, where it keeps calling the isDone() method of the Future object to check if the I/O operation is finished or not.And rest of the code is self explanatory.But note that while checking the result of future object we are taking a 1 second sleep,    but in real life we  we will do some useful calculation there.

Example of push approach using CompletionHandler object:


 This version of the write method of the AsynchronousFileChannel class  allows us pass a CompletionHandler object whose methods are called when the requested asynchronous I/O operation completes or fails.

CompletionHandler interface is defined in the java.nio.channels package.

The type parameters:

    V – The result type of the I/O operation
    A – The type of the object attached to the I/O operation

The CompletionHandler interface has two methods: completed() and failed(). The completed() method is called when the requested I/O operation completes successfully. the failed() method is called ,when the requested I/O operation fails. The API allows  us to  pass an object of any type to the completed() and failed() methods. Such an object is called an attachment.We may want to pass an attachment such as the ByteBuffer or the reference to the channel or an reference to the I/O source etc. to these methods such that we can perform additional actions  inside these methods.For example we want to close the AsynchronousFileChannel once the async I/O operation completes successfully or fails due to any reason.We can also pass  null as an attachment , if we don't want to do anything usefull.
Lets create an Attachment object first

public class Attachment {

private Path filesource;
private AsynchronousFileChannel asyncChannel;


//getters and setters goes here.

}

Now let's define the CompletionHandler

private static class MyWriteCompletionHandler
implements CompletionHandler {
@Override
public void completed(Integer result, Attachment attachment) {
System.out.format("%s bytes written to %s%n",
result, attachment.path.toAbsolutePath());
try {
attachment.asyncChannel.close();
}
catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable e, Attachment attachment) {
System.out.format("I/O operation on %s file failed." +
"with  error is: %s", attachment.path, e.getMessage());
try {
attachment.asyncChannel.close();
}
catch (IOException e1) {
e1.printStackTrace();
}
}
}


public class ASyncIOWithCompletionHandler{

 public static void main(String[] args) {
static String str="write some meaning full text to file,which is desired for your applications.";
 Path path = Paths.get("/home/brainatjava/mytest");
 try {
AsynchronousFileChannel asyncfileChannel =
AsynchronousFileChannel.open(path, WRITE,CREATE);
MyWriteCompletionHandler handler = new MyWriteCompletionHandler();
ByteBuffer dataBuffer = ByteBuffer.wrap(str.getBytes());
Attachment attachment = new Attachment();
attachment.setAsyncChannel(asyncfileChannel);
attachment.setPath(path);
asyncfileChannel.write(dataBuffer, 0, attachment, handler);

try {
System.out.println("Sleeping for 10 seconds...");
Thread.sleep(10000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Completed");
}
catch (IOException e) {
e.printStackTrace();
}
 }
}

Here the main thread is  sleeping for 10 seconds ,but in real life scenario , the main thread will do some useful calculation rather than sleeping.