Thursday, November 19, 2015

Make a BoundedCollection by Using a Synchronizer

Sometimes it is required to create bounded collection.The main aim of the question is to create a collection so that it can contain only a fixed number of elements at a particular instant of time.There are many approaches to do it.There are some third party apis like apache.commons,Guava  which provides such functionalities.

But our aim here is to not to use any third party apis, rather we can implement it using some provided tools in Java.We are going to implement it by using an existing synchronizers ie. semaphore.We will take a semaphore with fixed number of permit.Before adding something in our collection we will acquire a permit and and if in case of any exception of adding will release the permit.And also when removing something from the collection we will release a permit.To read more about semaphore please visit the series semaphore and mutex


Implementation details:


In our below implementation we will take a hashSet  and make it bounded.Similarly we can make any collection  a bounded collection.

package com.brainatjava.test;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

public class BoundedCollection {
    private final Set hashSet;
    private final Semaphore semaphore;

    public BoundedCollection(int nosOfPermit) {
        this.hashSet = Collections.synchronizedSet(new HashSet());
        semaphore = new Semaphore(nosOfPermit);
    }

    public boolean add(T o) throws InterruptedException {
        semaphore.acquire();
        boolean isSomethingAdded = false;
        try {
            isSomethingAdded = hashSet.add(o);
            return isSomethingAdded;
        } finally {
            if (!isSomethingAdded)
                semaphore.release();
        }
    }

    public boolean remove(Object o) {
        boolean isSomethingRemoved = hashSet.remove(o);
        if (isSomethingRemoved)
            semaphore.release();
        return isSomethingRemoved;
    }
}

Friday, November 13, 2015

notify() and notifyAll():Which one to use

Sometimes we face dilemma to choose one out of notify() and notifyAll().This post aims to explain  the use of notify() and notifyAll() with the help of an example.Although there are many more apis in java.util.concurrent package to use for more serious multi threaded programming.But still it is required to understand the basics.Although this post is for the beginners to learn about the use of wait(), notify() and notifyAll(),but still the senior developers can clear their doubts and refresh their memory about it.

 Let's assume that we have a room and inside it there are three equipment.One is motor which is filling water in a tank.Other one is an  oven which is baking a cake.And the third one is an ice maker which converts water into ice.But all these are associated with a bell which rings after the completion of the specific task.And initially the room is locked and the key is available next to it.But the condition is at any instant of time only one person can take the lock and enter inside the room to use one of the equipment.After initiating his work
he can come out of the room by releasing the lock giving a chance to any other person to use any other equipment.And he waits outside of the room till the completion of his task and can collect his product from the equipment by entering the room again by reacquiring the lock.

And let's  have three threads named thread1 , thread2 and thread3.Let's assume thread1 came and take the lock of the room and go inside and switch on the motor to fill the tank and came out of the room by releasing the lock and wait outside to listen the ringing of the bell.This is exactly what wait method does.

Now thread2 came and took the the lock , went inside the room and put a cake in the oven.Then release the lock and went out of the room and wait outside to listen the ringing of the bell.

Similarly the thread3 came , took the lock and went inside the room and keep some water in the ice maker.Then release the lock and  went out of the room and wait outside to listen the ringing of the bell.

Here all threads are waiting  to listen the bell ringing sound.And bell ring means one of the task is finished.

Assume here that we have used notify() method of the object to notify a thread that it's waiting is over and the condition on which it was waiting is already changed,now it can woke up check for the condition again and if the condition is really changed then the thread can proceed whatever it wants to do.

But the issue here is that suppose a notification is issued for thread3 that water is already converted to ice.But since we use notify here.And we have three threads in waiting state , now the JVM will decide to wake up any one thread by using it's own algorithm.Suppose unfortunately JVM decided to wake up thread1.So now thread1 woke up and got the lock as it has no contestant checks the condition again (that is whether the tank is filled).
As tank is not filled yet again go to the waiting state by releasing the lock.But here the thread3 which actually is the deserving candidate missed the notification and still waiting to get it's turn.

Now in this case if we have used notifyall, then all the threads will  wake up and contest for lock and get the lock one by one.And the thread for which notification was meant for will  definitely  get the lock sooner or later and will not miss the signal at all.

So if this the case then we should use notifyall instead of notify.
But assume if we have only one equipment associated with the bell which is ringing upon completion of the task.Then we can use notify.
So if we model our real life scenario with the above example.We can decide whether it is best to use notify or notifyAll.

But we might tempted to think if there is so much confusion to use notify over notifyall,why not always use  notifyall.Yes we can do that,but we have to pay the price for it. Using notifyAll when only one thread can make progress is horribly inefficient. If more than one  threads are waiting on same  lock, calling notifyAll causes all of them to wake up and contest for the lock; then most or all of them will not get the lock and  will go to sleep again. This means a lot of context switches and a lot of contested lock acquisitions, which is not desirable.

Important Points to remember:

  • The thread whicch has called wait() method releases the lock as it waits.
  • The thread which has called wait method blocks until it is notified. 
  • Once it is notified ,it needs to reacquire the lock to  continue.
  • The thread must hold the lock released by the waiting thread to  call notify() or notifyall().
  • Suppose there are more than one thread  waiting on the same lock , calling notify causes the JVM to select one thread waiting on that lock  to wake up.
  • Calling notifyAll wakes up all the threads waiting on that lock.
  • Since the thread must hold the lock to call notify() or notifyall() method and waiting threads cannot return from wait without reacquiring the lock, the notifying thread should release the lock quickly to ensure that the waiting threads are unblocked as soon as possible.
  • If there is confusion which method to call notify() or notifyAll(),it is always safe to call notifyAll().
  • But calling notifyAll() has some overhead.
calling notifyAll() causes all the threads to wake up and contest for the lock; then most or all of them will go back to sleep, as any one thread will get the lock.Which causes a lot of context switch which is very much undesirable.  

 Consider the following code sample.Here we have examined the famous Consumer Producer  problem.
We have a sharedBuffer having a fixed size.And there are three threads.And all the three threads are using the same lock  i.e  sharedBuffer as the lock.


producerThread:It checks the sharedBuffer,if  the sharedBuffer is full , the thread calls wait() method.After getting notification from some another thread, it wakes up and an element in sharedBuffer.And then it calls notify().This notify call is meant for the consumerThread.That is it is an indication to the consumer thread that ,now the consumer thread can wake up and start consuming the element from the sharedBuffer.

consumerThread:It checks the sharedBuffer.If the sharedBuffer is empty,then the thread calls wait.Release the lock  and blocks till the notification received which is issued  by the producer after putting an element in the sharedBuffer.After consuming an element the consumerThread calls notify().And this notification is meant for the producerThread.That is it is an indication to the producerthread that ,now the producerthread can wake up and start producing an element in the sharedBuffer.

sizeCheckerThread:It checks the size of the sharedBuffer.If it is greater than 0,then it calls the wait method by releasing the lock.This thread is an extra thread to demonstrate the fact that it is receiving the notification which is not meant for it.

Notice that in produce method , consume method   and  doSizeChecking method we are checking the condition by using a while loop  but not using  if.The main reason for this is if the waiting  thread  wake up by spurious wake up(which is generated by OS) but the condition on which it is waiting is still not satisfied , then the thread will have an opportunity to wait again.

Analogy between the Example and Sample Code:

Now let's compare the analogy between the example and the sample code.notify() method is analogous to the bell which rings.But the bell rings in three different conditions i.e when tank is filled,cake is baked and ice is made.Here also notify is called in three different conditions.When consumer consumes an element,producer produce an element and size of the sharedBuffer is not greater than zero.In the example the shared lock is the room lock and here in the sample code the shared lock is sharedBuffer.

Missed Signal Issue:

Assume that if the notification is raised by  consumerThread that it has consumed an element and the sharedBuffer is not full and the producer can put new element in it.Actually this notification is meant for the producer, but there is no guarantee that the producer thread will receive the notification.By the choice of the JVM  the sizeCheckerThread may receive the notification instead of  producerThread.The case may be so unfortunate that, the producerThread may never receive the notification signal issued for him and it is always hijacked by the  sizeCheckerThread.Which we can say as a missed signal issue.So in this case we should  use notifyAll instead of notify, to avoid such missed signal issue.So that all the threads will get equal chance to wake up and contest for lock and will get the lock sooner or later.

When to use notify():

  1.  Only one condition is associated with the lock.
  2.  Each thread executes the same logic by returning from wait() method call.
  3. A notification  enables at most one thread to proceed.
In our below example code if we remove the sizeCheckerThread,then it follows the 3rd rule of the above three rules, that is a notification  will allow only one thread to proceed  ie. either consumerThread or producerThread.But it does not follow the 1st rule.It has two conditions associated   with the same lock ie. if sharedBuffer is full  for producerThread and sharedBuffer is empty for consumerThread.Also it does not follow the 2nd rule ie. each thread executes the different logic after returning from wait() method call.

Usually we rarely get such ideal scenarios to implement  in our multithreaded environment , so in almost all cases it is required to use notifyAll().


package com.brainatjava.test; import java.util.ArrayList; public class ProducerConsumerProblem { public static void main(String args[]) { ArrayList sharedBuffer = new ArrayList(); int size = 4; Thread producerThread = new Thread(new Producer(sharedBuffer, size), "Producer"); Thread consumerThread = new Thread(new Consumer(sharedBuffer), "Consumer"); Thread sizeCheckerThread = new Thread(new SizeChecker(sharedBuffer), "sizeChecker"); producerThread.start(); consumerThread.start(); sizeCheckerThread.start(); } } class Producer implements Runnable { private final ArrayList sharedBuffer; private final int SIZE; public Producer(ArrayList sharedBuffer , int size) { this.sharedBuffer = sharedBuffer ; this.SIZE = size; } @Override public void run() { int i=0; while (true) { System.out.println("Producer Produced: " + i); try { produce(i); i++; } catch (InterruptedException ex) { ex.printStackTrace(); } } } private void produce(int i) throws InterruptedException { synchronized (sharedBuffer) { while (sharedBuffer.size() == SIZE) { System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + sharedBuffer.size()); sharedBuffer.wait(); } sharedBuffer.add(i); sharedBuffer.notify(); } } }


class Consumer implements Runnable {

    private final ArrayList sharedBuffer;

    public Consumer(ArrayList sharedBuffer ) {
        this.sharedBuffer = sharedBuffer ;
    }

    @Override
    public void run() {
        while (true) {
            try {
                consume();
                Thread.sleep(50);
            } catch (InterruptedException ex) {
               ex.printStackTrace();
            }

        }
    }

    private void consume() throws InterruptedException {
         synchronized (sharedBuffer) {
        while (sharedBuffer.isEmpty()) {
                System.out.println("Queue is empty " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedBuffer.size());
                sharedBuffer.wait();
            }
            int value=(Integer)sharedBuffer.remove(0);
            System.out.println("Consumer consumed "+value);
            sharedBuffer.notify();
        }
    }
}


class SizeChecker implements Runnable {
      private final ArrayList sharedBuffer;

        public SizeChecker(ArrayList sharedQueue) {
            this.sharedBuffer = sharedQueue;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    doSizeChecking();
                    Thread.sleep(50);
                } catch (InterruptedException ex) {
                   ex.printStackTrace();
                }

            }
        }
        private void doSizeChecking() throws InterruptedException {
             synchronized (sharedBuffer) {
            while (sharedBuffer.size()>0) {
                    System.out.println("Going to wait as size>=0 " + Thread.currentThread().getName()
                                         +"  "+ sharedBuffer.size());
                    sharedBuffer.wait();
                    System.out.println("wake up from wait by notification form a thread");
                }
               
                System.out.println("Quesize is 0 "+sharedBuffer.size());
               
            }
        }
}
Please feel free to comment for anything wrong is observed.Comments will be used to improve or correct the post if required.