Saturday, April 23, 2016

Redis Pub Sub with Jedis

Within these days we were working on a project which heavily relies on Redis for its data sync process.To give  more clarity here we are briefing  the scenario.

There are two parts on the system.One is a producer which is making some cache in Redis and another is a consumer which is subscribed to the same Redis.The subscription is through a channel.The consumer is subscribed to the channel.When there is some change in the channel ,it is published to the consumer and the consumer  updates itself accordingly.Here we used Jedis as Redis  client.



In Redis, we can subscribe to multiple channels and when someone publishes messages on those channels, Redis notifies us  with published messages. Jedis provides this functionality with JedisPubSub abstract class. To handle pub / sub events, we need to extend JedisPubSub class and implement the abstract methods.



package com.brainatjava.test;
public class xyzListener extends JedisPubSub {   @Override
    public void onMessage(String channel, String message) {
    System.out.println(message);
}

Now we wrote the code for registering listener in a different class.

public class TestProgram {
private void registerListeners() {
        if(!xyzListener.isSubscribed()){
                Runnable task=()->{       
                try{
                    Jedis jdeisConnection=    ((Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection());
                    jedisConnectionExceptionFlag=false;
                    jdeisConnection.subscribe(lineItemDeliveryListener, "channelName");
                      
                }
                catch(JedisConnectionException jce){
                    logger.error("got jedis connection excpetion "+jce.getMessage(),jce);
                    jedisConnectionExceptionFlag=true;
                }
                catch(RedisConnectionFailureException rce){
                    logger.error("got jedis RedisConnectionFailureException excpetion "+rce.getMessage(),rce);
                    jedisConnectionExceptionFlag=true;
                }
                catch(Exception e){
                logger.error("error in registerListeners "+xyzListener.isSubscribed(),e);
            }};
           
            Thread xyzUpdater = new Thread(task);
            xyzUpdater.setName("xyzUpdater");
            xyzUpdater.start();
        }
    }

}
Here if we notice the above code ,we found that  first we are checking is xyzListener is subscribed to the required channel ,if not we are doing it.But Observe that we are doing it in a hacky way.That is we are getting the native connection first and then we are subscribing to the channel.And the subscribe is a blocking call.It act in wait and watch mode.

So when there is a change in the channel the listener listens it and update its cache accordingly.But There is always should have a fail safe mechanism in place.We have also done that.If somehow Jedis pubsub is not working,then we have a mechanisim in place to do it manually.

The mechanism is like we have a cron scheduler running in every 15 minutes and    checking the cache timestamp and if the cache timestamp of the latest cache and the timestamp of the consumer varies, then we assume there is issue with pubsub and we will update the consumer cache manually.

With this design everything was fine and  the 15 minutes cron was their without any use.After the smooth working of some days we got an alert that the 15 minute cron is running and manually updating the cache.So it has no impact on our service as the cache is getting updated manually with the help of cron scheduler.

But why this happened?After investigetting sometimes we found before some  days the redis was restarted.And this created the whole issue.It is behaving like it is subscribed.Now you know the solution.

Friday, March 11, 2016

The try-with-resources Statement

It is always required to close the resources like database connections, file handles like BufferedReader,BufferedWriter etc after it's use.Otherwise we will face resource leak issue.And sometimes we forget to close the resources after the use.But in Java 7 a functional interface namely AutoCloseable is introduced.And resoureces like Connection ,BufferedReader and BufferedWriter etc extends AutoCloseable.
The try-with-resources statement is a try statement that declares one or more resources. The try-with-resources statement ensures that each resource is closed at the end of the try statement. Any object that implements java.lang.AutoCloseable,  can be used as a resource inside the try-with-resources statement.
The following example writes a line in a file. It uses an instance of BufferedWriter to write data in the file. BufferedWriter is a resource that must be closed after the program is finished with it:
static String writeALineToFile(String path) throws IOException {
    try (BufferedWriter bw =
                   new BufferedWriter(new FileWriter(new File("path")))) {
        return bw.write();
    }
}
In this example, the resource declared in the try-with-resources statement is a BufferedWriter. The declaration statement appears within parentheses immediately after the try keyword. The class BufferedWriter, in Java SE 7 and later, implements the interface java.lang.AutoCloseable. Because the BufferedWriter instance is declared in a try-with-resource statement, it will be closed regardless of whether the try statement completes normally or abruptly (as a result of the method BufferedWriter.write throwing an IOException).

Let's make it clear that here try-with-resource statement and try block are two different things.

Prior to Java SE 7, we can use a finally block to ensure that a resource is closed regardless of whether the try statement completes normally or abruptly. The following example uses a finally block instead of a try-with-resources statement:

static String writeALineToFileWithFinally(String path)
                                                     throws IOException {
 try {
       BufferedWriter bw =
                   new BufferedWriter(new FileWriter(new File("path")))) ;
         bw.write();
    }
}
    } finally {
        if (bw != null) bw.close();
    }
}
However, in this example, if the methods write and close both throw exceptions, then the method  
writeALineToFileWithFinally   throws the exception thrown from the finally block; the exception thrown from the try block is suppressed.

But In contrast, in the example readFirstLineFromFile, if exceptions are thrown from both the try block and the try-with-resources statement, then the method readFirstLineFromFile throws the exception thrown from the try block; the exception thrown from the try-with-resources block is suppressed.

We  can retrieve the suppressed exceptions by calling the Throwable.getSuppressed method from the exception thrown by the try block.


Note: A try-with-resources statement can have catch and finally blocks just like an ordinary try statement. In a try-with-resources statement, any catch or finally block is run after the resources declared have been closed.

Saturday, March 5, 2016

Java 8 Lambda Expressions-Continued

As we see in the previous series Lambda expression,we used an Functional interface and created a Lambda expression by removing the anonymous inner class code.

Here we will see that we can do the same job without using our custom functional interface.But here we will take the help of some of the inbuilt functional  interfaces which is provided in java8 in package java.util.function.



@FunctionalInterface
public interface Predicate <T>{
  boolean test(T t);
.
.
.
.
}

Here parameter T is the type of the input to the predicate
Here the Functional Interface Predicate given in package  java.util.function represents a boolean-valued function(predicate) of a single argument.It has one abstract method test(Object obj).

Let's see the example below


package com.brainatjava;
import java.util.Arrays;
import java.util.List;
import java.util.function.Predicate;
public class Test {
    static List wordList = Arrays.asList(new String[]{"a","b","Lambda","d"});
    public static void findString(List list, Predicate predicate) {
            for (String p : list) {
                if (predicate.test(p)) {
                   System.out.println("we found Lambda anonymously.");
                }
            }
        }
    public static void main(String[] args) {
        findString(wordList,x->x.equalsIgnoreCase("Lambda"));
    }
}


Now let's see some other built in functional interfaces and it's use case.

@FunctionalInterface
public interface Function {
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t); ....
....
//along with some other methods

}

It has an abstract method namely apply having argument T and return type R.
Suppose we have a requirement to take some strings and convert those into doubles.
So we will take the help of the apply() method of the above  functional interface.
So we wrote a method namely changeFormat like below.

public static List changeFormat(Function function, List source) {
        List sink = new ArrayList<>();
        for (T item : source) {
        R value = function.apply(item);
        sink.add(value);
        }
        return sink;
        }


package com.brainatjava;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;

public class Test {     public static void main(String[] args) {
        List digits = Arrays.asList("1","2","9","7","5");         List numbers = changeFormat(n->new Double(n), digits);
    }

I request you to visit the official documentation from oracle to get  the knowledge of all available functional interfaces in java.util.fuction package those provide target types for lambda expressions and method references.

Friday, March 4, 2016

Method Refernce(double colon operator) vs Lambda Expression in Java 8

In our previous post we saw that we can use lambda expression to implement function interface.But it is not the only way to implement functional interface. A new feature introduced in Java 8 known as Method Reference.It has the ability to replace an instance  of functional interface with a method having same argument as the method of the functional interface.Usually in Java :: operator is used for method reference.The method in the functional interface and the passing method reference should match for the argument and return type.
Let's take an example

package com.brainatjava;

import java.util.Arrays;

import java.util.List;

import java.util.function.Predicate;

public class Test {

    static List wordList = Arrays.asList(new String[]{"a","b","Lambda","d"});
    public static void findString(List list, Predicate predicate) {
            for (String p : list) {
                if (predicate.test(p)) {
                   System.out.println("we found Lambda by method expression.");
                }             }         }     public static void main(String[] args) {         Predicate predicate=Test::check;         findString(wordList,predicate);     }     public static boolean check(String value){         return value.equalsIgnoreCase("Lambda");     } }

@FunctionalInterface
public interface Predicate {
    boolean test(T t); }
Here if we  look at the findString() method, we see that a functional interface named Predicate is used.And we used the abstract method test() of the interface Predicate  to verify the result.

In our above example  we have defined method named check() in our class Test having same signature as method test() of funcational interface Predicate.So we can use method expression here.So in main method we call findString as findString(wordList,Test::check).By using method expression we utilized the existing method check().As we stated above that we can replace an instance of the functional interface with a method expression.So we could write  the line  Predicate predicate=Test::check.Here check is a static method of our class Test so we use it directly along with the class name.And used the instance of Predicate in findString method as findString(wordList,predicate).Just note the use of ::(double colon) operator , we are not calling the check() method,but we are only getting a reference to it's name.

Method reference are three types
1.Static Method References
2.Instance Method Reference
3.Constructor Method References

Now let's consider another functional interface namely Function provided in Java 8.

@FunctionalInterface
public interface Function <T,R>{

    /**
     * Applies this function to the given argument.
     *
     * @param t the function argument
     * @return the function result
     */
    R apply(T t);

....
....
//along with some other methods

}


It has an abstract method namely apply having argument T and return type R.
Suppose we have a requirement to take some strings and convert those into doubles.
So we will take the help of the apply() method of the above  functional interface.
So we wrote a method namely changeFormat like below.

public static  List changeFormat(Function function, List source) {
        List sink = new ArrayList<>();
        for (T item : source) {
        R value = function.apply(item);
        sink.add(value);
        }
        return sink;
        }

As we know that the job of a method refernece is to replace an instance of the functional interface.Now here we will do the same.But we know very well that Java has a constructor of Double class like below.

public Double(String s) throws NumberFormatException {
        value = parseDouble(s);
    }
If we observe here we find that the signature of the above Double constructor is similar with the apply method of the functional interface.
So we can replace the instance of the functional interface with the method reference.But this time we call it as Constructor Method References, as we are using a constructor here.So we can write the same as

Function function=Double::new;
By adding the above code segments in a main method ,our complete code look like below.
package com.brainatjava;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Predicate;

public class Test {
  
    public static void main(String[] args) {
        List digits = Arrays.asList("1","2","9","7","5");
        Function function=Double::new;
        List numbers = changeFormat(function, digits);
    }
public static List changeFormat(Function function, List source) {
        List sink = new ArrayList<>();
        for (T item : source) {
        R value = function.apply(item);
        sink.add(value);
        }
        return sink;
        }
  
}

Please refer the series java streams which always used with lambda expression for parallel processing.

Wednesday, February 24, 2016

Java 8 Lambda Expressions

In this blog we are going to discuss what is Lambda expression.And why it is needed.How to use it in Java.What are the benefits of using Lambda Expression.Has it some disadvantages of using it?Also we will learn about functional interface and default methods.We will move step by step from basics of Lambda Expression to its usage by simple code samples But before this we need to learn some of the backgrounds of Lambdas.

So we will learn about the followings first.
1.Functional interface
2.Anonymous inner class
3.Internal VS External iteration

1. Functional interface:

A Interface with only one abstract method are referred as Functional     Interface.But A functional interface can have more than one static or default methods .

The  below given example is a functional interface


public interface TestFunctional {

       void  doSomething(int x);

        static void staticMethod() {

              System.out.println("static method  in Functional Interface.");

           }

Some well known functional interfaces in Java are
1.java.lang.Runnable having only one abstract method  public abstract void run().
2 . java.util.concurrent.Callable<V> having only one abstract method  V call() throws Exception.
3.java.util.Comparator<T>  having only one abstract method   int compare(T o1, T o2) along with other default and static methods.

Note:We can write @FunctionalInterface to annotate the functional interfaces.
@FunctionalInterface
@FunctionalInterface

2.Anonymous inner class:

Let's take an example.Suppose we have a list of strings.And the requirement in hand is to check whether the list contains a specific string say "Lambda".
So what will we do here?There are many approaches to solve this problem.
Here we will consider the Anonymous inner class solution.

First let's declare an interface like 

package com.brainatjava;

public interface CheckWords {

    boolean test(String p);

}

package com.brainatjava;

import java.util.Arrays;
import java.util.List;

public class Test {
    static List wordList = Arrays.asList(new String[]{"a","b","Lambda","d"}); 
    public static void findString(List list, CheckWords checkWord) {
            for (String p : list) {
                if (checkWord.test(p)) {
                   System.out.println("we found Lambda anonymously.");
                }
            }
        }
    public static void main(String[] args) {
        findString(wordList,new CheckWords(){
            public boolean test(String x){
                return x.equalsIgnoreCase("Lambda");
            }
});
}
}

Here notice that, we have no class which is implementing the CheckWords interface and overrides the test method.One of the arguments of the findString method is an anonymous class, which filters the strings with some conditions ie. which is equals to Lambda.This approach reduces the amount of code required because we don't have to create a new class for each condition that we want to test.Like if we want to find the string "closure" in the given list, we can write the anonymous class in the argument of findString method to check it.However, the syntax of anonymous classes is bulky

3.Internal VS External iteration:

External iteration is  the way by which we access the elements of the collection sequentially either by using a for each loop or by using an iterator.It restricts  the opportunity to manage the elements  of the collection by not  using reordering of the data, parallelism  etc.

Example:

1:By using for each

List charcters = Arrays.asList(new String[]{"a","b","c","d"});
 for(String charcter: charcters){
            System.out.println(charcter);
        }
 2:By using iterator

 List charcters = Arrays.asList(new String[]{"a","b","c","d"}); 
 Iterator iterator = charcters.listIterator();
        while(iterator.hasNext()){
            System.out.println(iterator.next());
        }
Sometimes it is required to consume the data of a collection in parallel or in random order which is suitable for the purpose of speed and efficiency.In this case we can avoid the sequential/insertion order  traversal of a collection .So  where order of the elements of a collection is not important  internal iteration help us to do the task in a parallel or random order.So we only have to tell what we want to do, but we don't need to specify in which order the elements will be processed.We leave this to the API to manage the order of the elements.

Introduction to Lambda Expressions:

Lambda expressions are anonymous methods that can replace the bulky code of anonymous inner class  that we described above.Simply we can say, it is a method without declaration.That is it has no name,no access modifier,no return type.And the method arguments have no type.As it will be clear from the context.We can say it is like a method we write in the same place where it will be in use.
We can say  Lambda expressions are implementation of only abstract method of a functional interface that is implemented by anonymous inner class.
It is useful where the method is used only once and the method definition is very short.It saves our effort of declaring a method with return type, name,access specifier and argument data types etc.
Lambda expressions are implementation
Lambda expressions are implementation
Lambda expressions are implementation of only abstract method of functional interface that is being implemented or instantiated anonymously. - See more at: http://java8.in/java-8-lambda-expression/#sthash.4TIhTUTQ.dpuf
Lambda expressions are implementation of only abstract method of functional interface that is being implemented or instantiated anonymously. - See more at: http://java8.in/java-8-lambda-expression/#sthash.4TIhTUTQ.dpuf

Lambda Expression structure: 

A Lambda expression has three parts
1.An arrow (->) token
2.argument list
3.body

(argument) -> (body) , is the structure of a valid Lambda expression.


argument list can have zero or more arguments.We can just think it like method arguments.

body of a Lambda expression can have zero or more statements.it can be a single statement  or a block of statements.No need to write return statements at the end of the body.The complete expression will be evaluated and returned.if the expression or statement evaluates to void ,then nothing will be returned.If the body contains a single statement no need to enclose it with curly braces.

Some Examples of Lambda Expressions:

 1.()->System.out.println("write something"); having zero argument and single statement in the body.
2.()->80 is also valid Lambda expression
3.(int x,int y)->{return x+y;} having two arguments and one statement

4.In section 2 Anonymous inner class section , We use Anonymous inner classes to instantiate objects of functional interface.Just  look at the line where we called findString() method in section 2, we used anonymous inner class to instantiate the functional  interface CheckWords.

The same can be done by Lambda expression.

public class Test {
    static List wordList = Arrays.asList(new String[]{"a","b","Lambda","d"});
    public static void findString(List list, CheckWords checkWord) {
            for (String p : list) {
                if (checkWord.test(p)) {
                   System.out.println("we found Lambda anonymously.");
                }
            }
        }
    public static void main(String[] args) {
        findString(wordList,x->
                 x.equalsIgnoreCase("Lambda")
            );
    }
}
Here we replaced the anonymous  innercalss with Lambda Expression.
What we have done here is   
CheckWords checkWord=x->x.equalsIgnoreCase("Lambda")           

That is we can write the above code as

 public  static void main(String[] args) {
 CheckWords checkWords=x->x.equalsIgnoreCase("Lambda")       
 findString(wordList,checkWord);
    }
5.As we discussed above that java.lang.Runnable is a functional interface,we can also use Lambda expression at the time of thread creation.

Let's first see the traditional  way of creating a thread.       

new Thread(new Runnable() {
    @Override
    public void run() {
        System.out.println("implementing run method in an anonymous function.");
    }
}).start();
Now let's see the Lambda way of doing it.

new Thread(
    () -> System.out.println("implementing run method by Lambda expression")
).start();
The same can be written as 

Runnable runnable=  () -> System.out.println("implementing run method by Lambda   expression");   
  new Thread(runnable).start();
Lambda Expression Part2--->

Thursday, November 19, 2015

Make a BoundedCollection by Using a Synchronizer

Sometimes it is required to create bounded collection.The main aim of the question is to create a collection so that it can contain only a fixed number of elements at a particular instant of time.There are many approaches to do it.There are some third party apis like apache.commons,Guava  which provides such functionalities.

But our aim here is to not to use any third party apis, rather we can implement it using some provided tools in Java.We are going to implement it by using an existing synchronizers ie. semaphore.We will take a semaphore with fixed number of permit.Before adding something in our collection we will acquire a permit and and if in case of any exception of adding will release the permit.And also when removing something from the collection we will release a permit.To read more about semaphore please visit the series semaphore and mutex


Implementation details:


In our below implementation we will take a hashSet  and make it bounded.Similarly we can make any collection  a bounded collection.

package com.brainatjava.test;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

public class BoundedCollection {
    private final Set hashSet;
    private final Semaphore semaphore;

    public BoundedCollection(int nosOfPermit) {
        this.hashSet = Collections.synchronizedSet(new HashSet());
        semaphore = new Semaphore(nosOfPermit);
    }

    public boolean add(T o) throws InterruptedException {
        semaphore.acquire();
        boolean isSomethingAdded = false;
        try {
            isSomethingAdded = hashSet.add(o);
            return isSomethingAdded;
        } finally {
            if (!isSomethingAdded)
                semaphore.release();
        }
    }

    public boolean remove(Object o) {
        boolean isSomethingRemoved = hashSet.remove(o);
        if (isSomethingRemoved)
            semaphore.release();
        return isSomethingRemoved;
    }
}

Friday, November 13, 2015

notify() and notifyAll():Which one to use

Sometimes we face dilemma to choose one out of notify() and notifyAll().This post aims to explain  the use of notify() and notifyAll() with the help of an example.Although there are many more apis in java.util.concurrent package to use for more serious multi threaded programming.But still it is required to understand the basics.Although this post is for the beginners to learn about the use of wait(), notify() and notifyAll(),but still the senior developers can clear their doubts and refresh their memory about it.

 Let's assume that we have a room and inside it there are three equipment.One is motor which is filling water in a tank.Other one is an  oven which is baking a cake.And the third one is an ice maker which converts water into ice.But all these are associated with a bell which rings after the completion of the specific task.And initially the room is locked and the key is available next to it.But the condition is at any instant of time only one person can take the lock and enter inside the room to use one of the equipment.After initiating his work
he can come out of the room by releasing the lock giving a chance to any other person to use any other equipment.And he waits outside of the room till the completion of his task and can collect his product from the equipment by entering the room again by reacquiring the lock.

And let's  have three threads named thread1 , thread2 and thread3.Let's assume thread1 came and take the lock of the room and go inside and switch on the motor to fill the tank and came out of the room by releasing the lock and wait outside to listen the ringing of the bell.This is exactly what wait method does.

Now thread2 came and took the the lock , went inside the room and put a cake in the oven.Then release the lock and went out of the room and wait outside to listen the ringing of the bell.

Similarly the thread3 came , took the lock and went inside the room and keep some water in the ice maker.Then release the lock and  went out of the room and wait outside to listen the ringing of the bell.

Here all threads are waiting  to listen the bell ringing sound.And bell ring means one of the task is finished.

Assume here that we have used notify() method of the object to notify a thread that it's waiting is over and the condition on which it was waiting is already changed,now it can woke up check for the condition again and if the condition is really changed then the thread can proceed whatever it wants to do.

But the issue here is that suppose a notification is issued for thread3 that water is already converted to ice.But since we use notify here.And we have three threads in waiting state , now the JVM will decide to wake up any one thread by using it's own algorithm.Suppose unfortunately JVM decided to wake up thread1.So now thread1 woke up and got the lock as it has no contestant checks the condition again (that is whether the tank is filled).
As tank is not filled yet again go to the waiting state by releasing the lock.But here the thread3 which actually is the deserving candidate missed the notification and still waiting to get it's turn.

Now in this case if we have used notifyall, then all the threads will  wake up and contest for lock and get the lock one by one.And the thread for which notification was meant for will  definitely  get the lock sooner or later and will not miss the signal at all.

So if this the case then we should use notifyall instead of notify.
But assume if we have only one equipment associated with the bell which is ringing upon completion of the task.Then we can use notify.
So if we model our real life scenario with the above example.We can decide whether it is best to use notify or notifyAll.

But we might tempted to think if there is so much confusion to use notify over notifyall,why not always use  notifyall.Yes we can do that,but we have to pay the price for it. Using notifyAll when only one thread can make progress is horribly inefficient. If more than one  threads are waiting on same  lock, calling notifyAll causes all of them to wake up and contest for the lock; then most or all of them will not get the lock and  will go to sleep again. This means a lot of context switches and a lot of contested lock acquisitions, which is not desirable.

Important Points to remember:

  • The thread whicch has called wait() method releases the lock as it waits.
  • The thread which has called wait method blocks until it is notified. 
  • Once it is notified ,it needs to reacquire the lock to  continue.
  • The thread must hold the lock released by the waiting thread to  call notify() or notifyall().
  • Suppose there are more than one thread  waiting on the same lock , calling notify causes the JVM to select one thread waiting on that lock  to wake up.
  • Calling notifyAll wakes up all the threads waiting on that lock.
  • Since the thread must hold the lock to call notify() or notifyall() method and waiting threads cannot return from wait without reacquiring the lock, the notifying thread should release the lock quickly to ensure that the waiting threads are unblocked as soon as possible.
  • If there is confusion which method to call notify() or notifyAll(),it is always safe to call notifyAll().
  • But calling notifyAll() has some overhead.
calling notifyAll() causes all the threads to wake up and contest for the lock; then most or all of them will go back to sleep, as any one thread will get the lock.Which causes a lot of context switch which is very much undesirable.  

 Consider the following code sample.Here we have examined the famous Consumer Producer  problem.
We have a sharedBuffer having a fixed size.And there are three threads.And all the three threads are using the same lock  i.e  sharedBuffer as the lock.


producerThread:It checks the sharedBuffer,if  the sharedBuffer is full , the thread calls wait() method.After getting notification from some another thread, it wakes up and an element in sharedBuffer.And then it calls notify().This notify call is meant for the consumerThread.That is it is an indication to the consumer thread that ,now the consumer thread can wake up and start consuming the element from the sharedBuffer.

consumerThread:It checks the sharedBuffer.If the sharedBuffer is empty,then the thread calls wait.Release the lock  and blocks till the notification received which is issued  by the producer after putting an element in the sharedBuffer.After consuming an element the consumerThread calls notify().And this notification is meant for the producerThread.That is it is an indication to the producerthread that ,now the producerthread can wake up and start producing an element in the sharedBuffer.

sizeCheckerThread:It checks the size of the sharedBuffer.If it is greater than 0,then it calls the wait method by releasing the lock.This thread is an extra thread to demonstrate the fact that it is receiving the notification which is not meant for it.

Notice that in produce method , consume method   and  doSizeChecking method we are checking the condition by using a while loop  but not using  if.The main reason for this is if the waiting  thread  wake up by spurious wake up(which is generated by OS) but the condition on which it is waiting is still not satisfied , then the thread will have an opportunity to wait again.

Analogy between the Example and Sample Code:

Now let's compare the analogy between the example and the sample code.notify() method is analogous to the bell which rings.But the bell rings in three different conditions i.e when tank is filled,cake is baked and ice is made.Here also notify is called in three different conditions.When consumer consumes an element,producer produce an element and size of the sharedBuffer is not greater than zero.In the example the shared lock is the room lock and here in the sample code the shared lock is sharedBuffer.

Missed Signal Issue:

Assume that if the notification is raised by  consumerThread that it has consumed an element and the sharedBuffer is not full and the producer can put new element in it.Actually this notification is meant for the producer, but there is no guarantee that the producer thread will receive the notification.By the choice of the JVM  the sizeCheckerThread may receive the notification instead of  producerThread.The case may be so unfortunate that, the producerThread may never receive the notification signal issued for him and it is always hijacked by the  sizeCheckerThread.Which we can say as a missed signal issue.So in this case we should  use notifyAll instead of notify, to avoid such missed signal issue.So that all the threads will get equal chance to wake up and contest for lock and will get the lock sooner or later.

When to use notify():

  1.  Only one condition is associated with the lock.
  2.  Each thread executes the same logic by returning from wait() method call.
  3. A notification  enables at most one thread to proceed.
In our below example code if we remove the sizeCheckerThread,then it follows the 3rd rule of the above three rules, that is a notification  will allow only one thread to proceed  ie. either consumerThread or producerThread.But it does not follow the 1st rule.It has two conditions associated   with the same lock ie. if sharedBuffer is full  for producerThread and sharedBuffer is empty for consumerThread.Also it does not follow the 2nd rule ie. each thread executes the different logic after returning from wait() method call.

Usually we rarely get such ideal scenarios to implement  in our multithreaded environment , so in almost all cases it is required to use notifyAll().


package com.brainatjava.test; import java.util.ArrayList; public class ProducerConsumerProblem { public static void main(String args[]) { ArrayList sharedBuffer = new ArrayList(); int size = 4; Thread producerThread = new Thread(new Producer(sharedBuffer, size), "Producer"); Thread consumerThread = new Thread(new Consumer(sharedBuffer), "Consumer"); Thread sizeCheckerThread = new Thread(new SizeChecker(sharedBuffer), "sizeChecker"); producerThread.start(); consumerThread.start(); sizeCheckerThread.start(); } } class Producer implements Runnable { private final ArrayList sharedBuffer; private final int SIZE; public Producer(ArrayList sharedBuffer , int size) { this.sharedBuffer = sharedBuffer ; this.SIZE = size; } @Override public void run() { int i=0; while (true) { System.out.println("Producer Produced: " + i); try { produce(i); i++; } catch (InterruptedException ex) { ex.printStackTrace(); } } } private void produce(int i) throws InterruptedException { synchronized (sharedBuffer) { while (sharedBuffer.size() == SIZE) { System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + sharedBuffer.size()); sharedBuffer.wait(); } sharedBuffer.add(i); sharedBuffer.notify(); } } }


class Consumer implements Runnable {

    private final ArrayList sharedBuffer;

    public Consumer(ArrayList sharedBuffer ) {
        this.sharedBuffer = sharedBuffer ;
    }

    @Override
    public void run() {
        while (true) {
            try {
                consume();
                Thread.sleep(50);
            } catch (InterruptedException ex) {
               ex.printStackTrace();
            }

        }
    }

    private void consume() throws InterruptedException {
         synchronized (sharedBuffer) {
        while (sharedBuffer.isEmpty()) {
                System.out.println("Queue is empty " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedBuffer.size());
                sharedBuffer.wait();
            }
            int value=(Integer)sharedBuffer.remove(0);
            System.out.println("Consumer consumed "+value);
            sharedBuffer.notify();
        }
    }
}


class SizeChecker implements Runnable {
      private final ArrayList sharedBuffer;

        public SizeChecker(ArrayList sharedQueue) {
            this.sharedBuffer = sharedQueue;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    doSizeChecking();
                    Thread.sleep(50);
                } catch (InterruptedException ex) {
                   ex.printStackTrace();
                }

            }
        }
        private void doSizeChecking() throws InterruptedException {
             synchronized (sharedBuffer) {
            while (sharedBuffer.size()>0) {
                    System.out.println("Going to wait as size>=0 " + Thread.currentThread().getName()
                                         +"  "+ sharedBuffer.size());
                    sharedBuffer.wait();
                    System.out.println("wake up from wait by notification form a thread");
                }
               
                System.out.println("Quesize is 0 "+sharedBuffer.size());
               
            }
        }
}
Please feel free to comment for anything wrong is observed.Comments will be used to improve or correct the post if required.