Showing posts with label Multi Threading. Show all posts
Showing posts with label Multi Threading. Show all posts

Thursday, November 19, 2015

Make a BoundedCollection by Using a Synchronizer

Sometimes it is required to create bounded collection.The main aim of the question is to create a collection so that it can contain only a fixed number of elements at a particular instant of time.There are many approaches to do it.There are some third party apis like apache.commons,Guava  which provides such functionalities.

But our aim here is to not to use any third party apis, rather we can implement it using some provided tools in Java.We are going to implement it by using an existing synchronizers ie. semaphore.We will take a semaphore with fixed number of permit.Before adding something in our collection we will acquire a permit and and if in case of any exception of adding will release the permit.And also when removing something from the collection we will release a permit.To read more about semaphore please visit the series semaphore and mutex


Implementation details:


In our below implementation we will take a hashSet  and make it bounded.Similarly we can make any collection  a bounded collection.

package com.brainatjava.test;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

public class BoundedCollection {
    private final Set hashSet;
    private final Semaphore semaphore;

    public BoundedCollection(int nosOfPermit) {
        this.hashSet = Collections.synchronizedSet(new HashSet());
        semaphore = new Semaphore(nosOfPermit);
    }

    public boolean add(T o) throws InterruptedException {
        semaphore.acquire();
        boolean isSomethingAdded = false;
        try {
            isSomethingAdded = hashSet.add(o);
            return isSomethingAdded;
        } finally {
            if (!isSomethingAdded)
                semaphore.release();
        }
    }

    public boolean remove(Object o) {
        boolean isSomethingRemoved = hashSet.remove(o);
        if (isSomethingRemoved)
            semaphore.release();
        return isSomethingRemoved;
    }
}

Friday, November 13, 2015

notify() and notifyAll():Which one to use

Sometimes we face dilemma to choose one out of notify() and notifyAll().This post aims to explain  the use of notify() and notifyAll() with the help of an example.Although there are many more apis in java.util.concurrent package to use for more serious multi threaded programming.But still it is required to understand the basics.Although this post is for the beginners to learn about the use of wait(), notify() and notifyAll(),but still the senior developers can clear their doubts and refresh their memory about it.

 Let's assume that we have a room and inside it there are three equipment.One is motor which is filling water in a tank.Other one is an  oven which is baking a cake.And the third one is an ice maker which converts water into ice.But all these are associated with a bell which rings after the completion of the specific task.And initially the room is locked and the key is available next to it.But the condition is at any instant of time only one person can take the lock and enter inside the room to use one of the equipment.After initiating his work
he can come out of the room by releasing the lock giving a chance to any other person to use any other equipment.And he waits outside of the room till the completion of his task and can collect his product from the equipment by entering the room again by reacquiring the lock.

And let's  have three threads named thread1 , thread2 and thread3.Let's assume thread1 came and take the lock of the room and go inside and switch on the motor to fill the tank and came out of the room by releasing the lock and wait outside to listen the ringing of the bell.This is exactly what wait method does.

Now thread2 came and took the the lock , went inside the room and put a cake in the oven.Then release the lock and went out of the room and wait outside to listen the ringing of the bell.

Similarly the thread3 came , took the lock and went inside the room and keep some water in the ice maker.Then release the lock and  went out of the room and wait outside to listen the ringing of the bell.

Here all threads are waiting  to listen the bell ringing sound.And bell ring means one of the task is finished.

Assume here that we have used notify() method of the object to notify a thread that it's waiting is over and the condition on which it was waiting is already changed,now it can woke up check for the condition again and if the condition is really changed then the thread can proceed whatever it wants to do.

But the issue here is that suppose a notification is issued for thread3 that water is already converted to ice.But since we use notify here.And we have three threads in waiting state , now the JVM will decide to wake up any one thread by using it's own algorithm.Suppose unfortunately JVM decided to wake up thread1.So now thread1 woke up and got the lock as it has no contestant checks the condition again (that is whether the tank is filled).
As tank is not filled yet again go to the waiting state by releasing the lock.But here the thread3 which actually is the deserving candidate missed the notification and still waiting to get it's turn.

Now in this case if we have used notifyall, then all the threads will  wake up and contest for lock and get the lock one by one.And the thread for which notification was meant for will  definitely  get the lock sooner or later and will not miss the signal at all.

So if this the case then we should use notifyall instead of notify.
But assume if we have only one equipment associated with the bell which is ringing upon completion of the task.Then we can use notify.
So if we model our real life scenario with the above example.We can decide whether it is best to use notify or notifyAll.

But we might tempted to think if there is so much confusion to use notify over notifyall,why not always use  notifyall.Yes we can do that,but we have to pay the price for it. Using notifyAll when only one thread can make progress is horribly inefficient. If more than one  threads are waiting on same  lock, calling notifyAll causes all of them to wake up and contest for the lock; then most or all of them will not get the lock and  will go to sleep again. This means a lot of context switches and a lot of contested lock acquisitions, which is not desirable.

Important Points to remember:

  • The thread whicch has called wait() method releases the lock as it waits.
  • The thread which has called wait method blocks until it is notified. 
  • Once it is notified ,it needs to reacquire the lock to  continue.
  • The thread must hold the lock released by the waiting thread to  call notify() or notifyall().
  • Suppose there are more than one thread  waiting on the same lock , calling notify causes the JVM to select one thread waiting on that lock  to wake up.
  • Calling notifyAll wakes up all the threads waiting on that lock.
  • Since the thread must hold the lock to call notify() or notifyall() method and waiting threads cannot return from wait without reacquiring the lock, the notifying thread should release the lock quickly to ensure that the waiting threads are unblocked as soon as possible.
  • If there is confusion which method to call notify() or notifyAll(),it is always safe to call notifyAll().
  • But calling notifyAll() has some overhead.
calling notifyAll() causes all the threads to wake up and contest for the lock; then most or all of them will go back to sleep, as any one thread will get the lock.Which causes a lot of context switch which is very much undesirable.  

 Consider the following code sample.Here we have examined the famous Consumer Producer  problem.
We have a sharedBuffer having a fixed size.And there are three threads.And all the three threads are using the same lock  i.e  sharedBuffer as the lock.


producerThread:It checks the sharedBuffer,if  the sharedBuffer is full , the thread calls wait() method.After getting notification from some another thread, it wakes up and an element in sharedBuffer.And then it calls notify().This notify call is meant for the consumerThread.That is it is an indication to the consumer thread that ,now the consumer thread can wake up and start consuming the element from the sharedBuffer.

consumerThread:It checks the sharedBuffer.If the sharedBuffer is empty,then the thread calls wait.Release the lock  and blocks till the notification received which is issued  by the producer after putting an element in the sharedBuffer.After consuming an element the consumerThread calls notify().And this notification is meant for the producerThread.That is it is an indication to the producerthread that ,now the producerthread can wake up and start producing an element in the sharedBuffer.

sizeCheckerThread:It checks the size of the sharedBuffer.If it is greater than 0,then it calls the wait method by releasing the lock.This thread is an extra thread to demonstrate the fact that it is receiving the notification which is not meant for it.

Notice that in produce method , consume method   and  doSizeChecking method we are checking the condition by using a while loop  but not using  if.The main reason for this is if the waiting  thread  wake up by spurious wake up(which is generated by OS) but the condition on which it is waiting is still not satisfied , then the thread will have an opportunity to wait again.

Analogy between the Example and Sample Code:

Now let's compare the analogy between the example and the sample code.notify() method is analogous to the bell which rings.But the bell rings in three different conditions i.e when tank is filled,cake is baked and ice is made.Here also notify is called in three different conditions.When consumer consumes an element,producer produce an element and size of the sharedBuffer is not greater than zero.In the example the shared lock is the room lock and here in the sample code the shared lock is sharedBuffer.

Missed Signal Issue:

Assume that if the notification is raised by  consumerThread that it has consumed an element and the sharedBuffer is not full and the producer can put new element in it.Actually this notification is meant for the producer, but there is no guarantee that the producer thread will receive the notification.By the choice of the JVM  the sizeCheckerThread may receive the notification instead of  producerThread.The case may be so unfortunate that, the producerThread may never receive the notification signal issued for him and it is always hijacked by the  sizeCheckerThread.Which we can say as a missed signal issue.So in this case we should  use notifyAll instead of notify, to avoid such missed signal issue.So that all the threads will get equal chance to wake up and contest for lock and will get the lock sooner or later.

When to use notify():

  1.  Only one condition is associated with the lock.
  2.  Each thread executes the same logic by returning from wait() method call.
  3. A notification  enables at most one thread to proceed.
In our below example code if we remove the sizeCheckerThread,then it follows the 3rd rule of the above three rules, that is a notification  will allow only one thread to proceed  ie. either consumerThread or producerThread.But it does not follow the 1st rule.It has two conditions associated   with the same lock ie. if sharedBuffer is full  for producerThread and sharedBuffer is empty for consumerThread.Also it does not follow the 2nd rule ie. each thread executes the different logic after returning from wait() method call.

Usually we rarely get such ideal scenarios to implement  in our multithreaded environment , so in almost all cases it is required to use notifyAll().


package com.brainatjava.test; import java.util.ArrayList; public class ProducerConsumerProblem { public static void main(String args[]) { ArrayList sharedBuffer = new ArrayList(); int size = 4; Thread producerThread = new Thread(new Producer(sharedBuffer, size), "Producer"); Thread consumerThread = new Thread(new Consumer(sharedBuffer), "Consumer"); Thread sizeCheckerThread = new Thread(new SizeChecker(sharedBuffer), "sizeChecker"); producerThread.start(); consumerThread.start(); sizeCheckerThread.start(); } } class Producer implements Runnable { private final ArrayList sharedBuffer; private final int SIZE; public Producer(ArrayList sharedBuffer , int size) { this.sharedBuffer = sharedBuffer ; this.SIZE = size; } @Override public void run() { int i=0; while (true) { System.out.println("Producer Produced: " + i); try { produce(i); i++; } catch (InterruptedException ex) { ex.printStackTrace(); } } } private void produce(int i) throws InterruptedException { synchronized (sharedBuffer) { while (sharedBuffer.size() == SIZE) { System.out.println("Queue is full " + Thread.currentThread().getName() + " is waiting , size: " + sharedBuffer.size()); sharedBuffer.wait(); } sharedBuffer.add(i); sharedBuffer.notify(); } } }


class Consumer implements Runnable {

    private final ArrayList sharedBuffer;

    public Consumer(ArrayList sharedBuffer ) {
        this.sharedBuffer = sharedBuffer ;
    }

    @Override
    public void run() {
        while (true) {
            try {
                consume();
                Thread.sleep(50);
            } catch (InterruptedException ex) {
               ex.printStackTrace();
            }

        }
    }

    private void consume() throws InterruptedException {
         synchronized (sharedBuffer) {
        while (sharedBuffer.isEmpty()) {
                System.out.println("Queue is empty " + Thread.currentThread().getName()
                                    + " is waiting , size: " + sharedBuffer.size());
                sharedBuffer.wait();
            }
            int value=(Integer)sharedBuffer.remove(0);
            System.out.println("Consumer consumed "+value);
            sharedBuffer.notify();
        }
    }
}


class SizeChecker implements Runnable {
      private final ArrayList sharedBuffer;

        public SizeChecker(ArrayList sharedQueue) {
            this.sharedBuffer = sharedQueue;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    doSizeChecking();
                    Thread.sleep(50);
                } catch (InterruptedException ex) {
                   ex.printStackTrace();
                }

            }
        }
        private void doSizeChecking() throws InterruptedException {
             synchronized (sharedBuffer) {
            while (sharedBuffer.size()>0) {
                    System.out.println("Going to wait as size>=0 " + Thread.currentThread().getName()
                                         +"  "+ sharedBuffer.size());
                    sharedBuffer.wait();
                    System.out.println("wake up from wait by notification form a thread");
                }
               
                System.out.println("Quesize is 0 "+sharedBuffer.size());
               
            }
        }
}
Please feel free to comment for anything wrong is observed.Comments will be used to improve or correct the post if required.

Saturday, October 10, 2015

Volatile keyword in Java

Usually we discuss a lot about how volatile works in java,but still it is not clear about the scope of volatile keyword.Recently one of my colleague asked me about the scope of volatile and its usefulness.I would like to use this space to  make it clear as possible.Although I described about the volatile keyword  in one of my post compiler reordering.But still it has some more directions to discuss.

Some points to know about JMM(JAVA Memory Model )
  1. Each thread has separate memory space.
  2. We need some special trick to enforce the communication between different threads.
  3. Some times memory writes can leak so that other threads may read the updated value.But this is not guaranteed means of communication between two threads.

Role Of Volatile in Thread communications :

  • Volatile modifier is a mechanism by which communication between different threads are guaranteed.
  • When second thread see the value written in volatile variable by the first thread.,then it is the guarantee that second thread will see all the contents of the (first threads memory space ) memory space written by first thread just before writing into the volatile variable.
  • We call this principle as  happens before principle in JMM.
 Let's try to understand it with the help of an example.

Consider we have an scenario  like this.We have an int variable named result which is not volatile and an Boolean variable named flag which is volatile.And we have two threads Thread1 and Thread2.Suppose Thread1 started and make the value of result as 30 and the value of flag as true.

Thread1                                                                             
______

 result=30;
  flag= true;


   Thread2
________

if(flag)
System.out.println(result); 


Then Thread2 comes and reads from flag and sees the value written to it by Thread1 .Because this communication happens, all of the memory space seen by Thread 1, just before it wrote to flag , must be visible to Thread2, after it reads the value true for flag.

So here Thread2 will print the value of result as 30.This is guaranteed due to the volatile modifier of flag.

Here if  you follow one of my  blog on double check for null instance , we have used the volatile modifier in line 2.
Just for convinece i am writing the snippet here

 public class SigletonTest {
 private static volatile SigletonTest instance = null;
          // private constructor
        private SigletonTest() {
        }
         public static SigletonTest getInstance() {
            if (instance == null) {
                synchronized (SigletonTest.class) {
                    // Double check
                    if (instance == null) {
                        instance = new SigletonTest();
                    }
                }
            }
            return instance;
        }
  }
Because If one thread creates an object, it has to convey or communicate the contents of its memory to another thread.Otherwise the newly created object will just remain in it's own memory space.But we need to communicate this message to other threads also, so that our purpose of single object creation can be achieved.That's why we used volatile modifier in line 2.

Some people argue that  , since the lock in form of synchronized block  also follows this happens before relationship , is the volatile modifier is necessary in line no 2?

The answer is yes,because here only the writting thread is performing the locking mechanism.But not the reader thread.If you see in line 7 null check of the instance is performed outside the synchronized block which is done by the reader thread.

Synchronization by itself would be enough in this case if the first check was within synchronized block.But we have kept it outside the synchronized block to save the synchronization cost, when the object is already created as discussed in my previous blog double check null instance.

Without explicit communication  with the help of volatile variable , the reader thread will not be able to see the fully constructed object created by the writer thread.

Immutability in Java

Generally we are asked questions about immutable in java.When a class is said to be immutable?Can you say whether the given class  is immutable?Lots of discussion.Here I want to summarize my knowledge about Immutable with the help of  some examples.

Immutable:

Definition: Informally we say an object is immutable if the state can not be modified after construction.That is the invariants defined by the constructor always hold even after any  steps of  the constructor creation.

Now let's see some example of immutable classes.

1. String:

As we all know that string class is immutable,let's start with String class. we see string class is final and it has  4 instance variables which form the object state.They are char array,offset,count and hash.Out of them 3 is final and hash is not final.Now let's check the invariants  here.The invariant is that at any time after the string construction the value of the charcter array will remain the same. 
This invariant will hold for the String as the character array is final.So once we assign some value to it in constructor , we can't assign any value afterwards.

why hash is not final:

Let's see the hashcode method defined in String below

public final class String {
 private final char value[];
private final int offset;>
private final int count;
private int hash;

public int hashCode() {
         int h = hash;
         if (h == 0) {
             int off = offset;
             char val[] = value;
             int len = count;

             for (int i = 0; i < len; i++) {
                 h = 31*h + val[off++];
             }
             hash = h;
         }
         return h;
     }
}
Now let's think about hash.Assume It as  a cache to store the value of the hashcode. If  we don't call hashcode method , the value for it will not be set. It could have been set during the creation of the string, but that will lead to longer creation time, for a feature we might not need at all.On the other hand, it would be unnecessary to calculate the hash each time its  required.So it is stored in a non final field.Just see inside the hashcode method if  hash  is assigned to  h.If h is not  0 , then same value is returned  immediately.So no need to recalculate  hash again and again.

The fact that there's a non-final field which  gives us the perception that the invariants may not hold. It's an internal implementation detail, which has no effect on the invriants we defined above.

 As hash is non final,so it might lead us to the doubt that ,it may change in future in some cases.
 Let's be specific about it.Here hash is used to contain the hashcode of the string.But in most of
 the cases it is never required to compute the hashcode till the lifespan of the String.But it is needed   sometimes to compare two strings to check the content equality.In that case hashcode is required.So string computes hashcode lazily but not in the constructor, as it is not immediately required.Also if we look at the hashcode method , we see that it is dependent on three parameters that is offset,count and value which themselves are final and can't change.so  every calculation of the hash give us the same result.As the method hascode is not synchronized , so it is possible that two threads are accessing the the method simultaneously and setting value in hashcode.So hash should be non final.

Why string class is final: 

 Let's assume String class is not final and see what misshapening can occur.
           
   Let's create a class MutableString which extends String.
           
     public class MutableString extends String {
     private String text;

     public MutableString(String value) {
         super(value);
         text = value;
     }

     public int getText() {
         return text;
     }
     public void setText(String newValue) {
         text = newValue;
     }
}

Now  the class MutableString  can be passed everywhere  where String is needed ,because it is of type String.consider the below case.Here we have a method ,that is verifies the password.If it does not pass certain criteria  it  discards the password, otherwise  forward  it for  the next step.

public String verifyPassword(String password) {
     if (!password.contains("some charcter"))
         throw SecurityException("The password is not a valid one");
     //Here in betwwen a thread come along and change the password value ,But now this password just      //changed to some new one is not verified and may contains invalid characters but still return by the method.
     return password;
 }
 Thread1
_______
 MutableString password="secret"
 verifyPassword(password)
 password.setText("secret1")

Here in between the verification process complete and before returning the password  a thread came along and changed the password, as described in the above code snippet.
But if String class was final,such type of scenario wouldn't have happened because MutableString class could not have extended the String class.

 So from these discussion,we reached in the conclusion that String class should be final and hash variable being not final has no impact in the invariant of the String class.

2.ThreeStoges:

 In the famous example of ThreeStoges.java from Brian Goetz page no 32.

 public final class ThreeStooges {
    private final Set stooges = new HashSet();
    public ThreeStooges() {
        stooges.add("Moe");
        stooges.add("Larry");
        stooges.add("Curly");
    }
    public boolean isStooge(String name) {
        return stooges.contains(name);
    }
}

Notice here that Set that stores the names is mutable and it is final.But just follow the argument here that the ThreeStooges class is immutable.

Let's consider the invariant for this class.
The invariant is the instance variable set should not be changed after the construction of the object finished.

Now  let's argu that we can change it after construction.
But here the stooges reference is final.So once it is assigned and initialized in the constructor it can not be changed.If there was a method to modify the set stooges  after construction or if a reference of this class was  escaped outside to some other thread before the construction is complete,then there would have been a chance that our invariant would not hold even after object construction.

So from this we reached in the conclusion that our assumption that we can change it after object construction is wrong.

But if we argue that if the line
 ThreeStooges ts = new ThreeStooges()
 is thread safe.Is it possible that one thread can can see the uninitialized object of the ThreeStooges where the intialization process already is in progress by another thread.
 Yes it is thread safe and no such thing will happen as it is guaranteed by the final keyword by JMM.See my   blog on compiler reordering : final and volatile

 Hence ThreeStoges class is immutable.

For more details please refer  Brian Goetz java concurrency in practice page 31.

3.Unmodifiable HashMap:



public class unmodifiableHashMap implements Map {
private final Map map;
public unmodifiableHashMap(Map map) {
this.map = new HashMap(map);
 }

  @Override
public V get(Object key) {
return map.get(key);
 }

  @Override
public V put(K key, V value) {
throw new UnsupportedOperationException();

similarly we can override all other getter and mutator(modifier/changer) methods.In this way we can  change a hashmap to an immutable hashmap.Here this immutable hashmap can be shared with multiple threads  safely.This guarantee is given by the final keyword used  in line no 2 of the above code snippet.

Conclusion:

 From the above three examples and discussions we conclude that
  •  An object is immutable if , It's state can't be modified after construction.
  •  An immutable object is thread safe.

 Ways to achieve it:

 1.All fields should final.
 2.the this reference should not escape during construction to any outside thread or client.
 3.There should not be any mutator method that can change the value of the instance variable after object construction.

Thursday, October 1, 2015

Compiler Reordering: final and volatile

Usually when we write a statement like Object o=new Object(); it is a three step process of CPU instruction
  1. Allocate space for new object
  2. Store the unconstructed object in variable
  3. Initialise object
Although the above steps are not exact,some similar steps happen  at the time of creating an object.
Let's see an example

class MyClass {
  int i;
  MyClass () {
    i = 1;
  }
}

When we write something like MyClass clazz=new MyClass();

The following steps should ideally  happen as per our assumption

  1. var= Allocate space for MyClass
  2. var.i = 1;
  3. clazz= var;
 But the compiler might do it in a different ordering.For optimization purpose  the above line of code can be written by compiler in a different manner like below snippet.

  1. clazz= Allocate space for MyClass 
  2. clazz.i = 1;
 But something different ordering happened in contrary to our assumption,We can call this as compiler reordering of the statements.

But the reordering of statements by compiler affects the thread safety.Assume that one thread is in the process of creating the MyClass object and it just completed the step 1.Now another thread came and saw the object is not null because of thread 1 completed step 1.And tried to clazz.i  and will get the wrong value,since thread1 has not completed step 2 yet.

Thread 1:
MyClass clazz = new MyClass ();

Thread 2:
if (clazz != null) {
  System.out.println(clazz .i);
}

So there is no guarantee that thread 2 will print 1.

Here this is a concern of thready safety.

Prevent Compiler Reordering:

1.final
 If  we redesign our class like 

class MyClass {
final  int i;
  MyClass () {
    i = 1;
  }
}

Here note that we changed the modifier of  the variable i as final.Now we can say this class is thread safe.
Without the final modifier, the compiler and JVM are allowed to move the write to i so that it occurs after the reference to the new object is written to clazz.But the final modifier will  restrict the compiler to do such type of reordering.

 2.volatile 
If you refer one of my series double check locking for singleton you will see in line number 2 we have used the keyword volatile for our singletonTest  instance.Without the volatile keyword this code will not work in java.The basic rule is that compiler reordering can change the code  so that the code in the SingletonTest constructor  occur after the write to the  instance variable in line number 11.If this will happen then there will be thread safety issue.

Just assume we have two threads Thread1 and Thread2.Now Thread1  will come and see  instance is null in getInstance method and  proceed to execute line 11 , but as we know line 11 is not an atomic operation , so just after  assigning to instance variable and  before constructing the SingletonTest object completely   , Thread2 can come along and read the instance before Thread1 finished the construction in line number 7 of getInstance method..

If we make the instance field volatile in line no 2 , the actions that should  happen before the write to instance  in the code must actually happen before the write to instance .No compiler  reordering is allowed. 

Saturday, August 29, 2015

CyclicBarrier Continued

In our previous series of CyclicBarrier we learned that CyclicBarrier can be constructed in two ways.
Like 
  1. CyclicBarrier cb=new  CyclicBarrier (int nosOfPArallelThread,Runnable BarrierAction);
  2.  CyclicBarrier cb=new  CyclicBarrier (int nosOfParallelThread);  
In our previous series we saw an example of first constructor, which takes two parameters numberofParallelThreads and a Runnable as argument.
In this series we look for the use of the second constructor ,  which takes only one integer argument,the number of parallel threads.

package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierConstructorWithOneArgument {

    private static int matrix[][] =
    {
        { 1 ,1,1,1},
        { 2, 2 ,2,2},
        { 3, 3, 3 ,3},
        { 4, 4, 4, 4 },
        { 5, 5, 5, 5 } };

    private static int results[];

    private static class Adder extends Thread
    {
        int row;

        CyclicBarrier barrierWhereAllThreadsWillWait;

        Adder(CyclicBarrier cb, int row)
        {
            this.barrierWhereAllThreadsWillWait = cb;
            this.row = row;
        }

        public void run()
        {
            int columns = matrix[row].length;
            int sum = 0;
            for (int i = 0; i < columns; i++)
            {
                sum += matrix[row][i];
            }
            results[row] = sum;
            System.out.println("Results for row " + row + " are : " + sum);
           int arrivalIndex=0;
            try
            {
                arrivalIndex= barrierWhereAllThreadsWillWait.await();
            } catch (InterruptedException ex)
            {
                ex.printStackTrace();
            } catch (BrokenBarrierException ex)
            {
                ex.printStackTrace();
            }
            if(arrivalIndex==0){
                System.out.println(Thread.currentThread().getName()+" is executing the combined result.");
                 int total = 0;
                 for (int i = 0; i < results.length; i++)
                 {
                     total += results[i];
                 }
                 System.out.println("Results are: " + total);
            }
        }
    }

    public static void main(String args[])
    {
        final int rows = matrix.length;
        results = new int[rows];
        CyclicBarrier barrier = new CyclicBarrier(rows);
        for (int i = 0; i < rows; i++)
        {
            new Adder(barrier, i).start();
        }
       
    }
}

In the above code snippet ,Please note the line highlighted with red color.Here the await() method of cyclicbarrier class is returning an int value.That signifies the arrival index of the thread at the barrier.
If the arrival index is zero ,it means that thread is the last thread to arrive at the barrier.

Here we choose the last thread that reaches at the barrier to execute the final action that is to calculate the total sum.Since we have taken the arrival index of the thread as zero.

Reset Method of CyclicBarrier(): 

If reset() method is called by any thread, then all the other threads waiting at the barrier will awake by throwing a BrokenBarrierException.So we can say that the reset() method is used when we want to break the barrier forcibly.Remember that if one thread will call reset method when no other threads are waiting on barrier,then reset method has no  effect.

Let's see the reset method in action with the help of an example.

package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

    class CyclicBarrierResetExample
    {
        public static void main(String args[])
        {
            CyclicBarrier cb=new CyclicBarrier(2,new Master());
            Thread slave1=new Thread(new Slave1(cb));
            slave1.start();
            Thread slave2=new Thread( new Slave2(cb));
            slave2.start();
        }
    }
    class Slave1 implements Runnable
    {
    CyclicBarrier cb;
        public Slave1(CyclicBarrier cb)
        {
            this.cb=cb;
        }
        public void run()
        {
            System.out.println("Slave1 has performed some work.");
            System.out.println("Slave1 is going to wait.");
            try
            {
                cb.await();
            }catch(BrokenBarrierException e){
                System.out.println("Slave1 can't wait as it is getting BrokenBArrier exception "+e.getMessage());
                e.printStackTrace();
            } catch (InterruptedException e) {
                 System.out.println("Slave1 can't wait as it is getting interupted exception  "+e.getMessage());
                e.printStackTrace();
            }
            System.out.println("Anyway !!!! Woo Slave1's waiting is finsihed.Slave1 is going home now.");
        }
    }
    class Slave2 implements Runnable
    {
    CyclicBarrier cb;
        public Slave2(CyclicBarrier cb)
        {
            this.cb=cb;
        }
        public void run()
        {
            System.out.println("Slave2 has performed some work.");
            System.out.println("Slave2 is going to wait.");
            try
            { Thread.sleep(1000);
                cb.reset();
                cb.await();
            }catch(BrokenBarrierException e){
                System.out.println("Slave2 can't wait as it is getting brokenbarrier exception "+e.getMessage());
            } catch (InterruptedException e) {
                 System.out.println("Slave2 can't wait as it is getting nullpointer exception "+e.getMessage());
                e.printStackTrace();
            }
           
            System.out.println("Anyway !!!! Woo Slave2's waiting is finsihed.Slave2 is going home now.");
        }
       
    }
    class Master implements Runnable
    {
        public void run()
        {
            System.out.println("Master is going to complete all his work as two slaves have already reached at the barrier.");
            System.out.println("Thank you slaves you completed all your work on time.");
        }
       
    }


Output Of the Above Programme

Slave1 has performed some work.
Slave1 is going to wait.
Slave2 has performed some work.
Slave2 is going to wait.
Master is going to complete all his work as two slaves have already reached at the barrier.
Thank you slaves you completed all your work on time.
Anyway !!!! Woo Slave2's waiting is finsihed.Slave2 is going home now.
Slave1 can't wait as it is getting BrokenBArrier exception null
java.util.concurrent.BrokenBarrierException
    at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
    at java.util.concurrent.CyclicBarrier.await(Unknown Source)
    at com.brainatjava.test.Slave1.run(CyclicBarrierResetExample.java:30)
    at java.lang.Thread.run(Unknown Source)
Anyway !!!! Woo Slave1's waiting is finsihed.Slave1 is going home now.

Monday, August 24, 2015

CyclicBarrier:Java Concurrency

In our previous series we have learned about different synchronizers like Semaphore,CountdownLatch etc.
Similarly CyclicBarrier is another synchronizer.We will try to understand it with the help of an example.

Let's assume we have a project.Now in initial stage assume there are 3 developers are working on it.And the project manager is there to review the status of the project.After working for some days the developers completed there code and the project manager reviewed the status.And he is happy now.All is well.Now he decided to move to the next phase.Assume there are 3 QA guys are there.Now the code is submitted to QA team for quality testing.After working some days , the QA team has completed the process.Again project manager reviewed the status of the project and he is happy with the progress.Then he decided to move to the next step ie. deployment in staging server  for  SIT(System Integration Test).Assume there are 3 support guys working on this and after few days,they completed there work.The manger reviewed the status and he is happy with it.Now he decided to move to the next step ie. for UAT(User Acceptance Test).Suppose there are 3 guys who is handling this process and he completes his work after some efforts.Now the project manager reviewed the status of the project and he is happy with it and gave green signal to go to the next step ie. to live the project.

Here Remember one thing that when one developer completes his work,he will wait for fellow developers to complete their work.That is they will wait for each other,unless all the  developers complete their work.Similarly all QA guys will wait for each other to complete their work.And same policy will apply for
the deployment and UAT guys.

Here one important thing to note is that after each stage the project manager is reviewing code and  giving go ahead after doing necessary corrections.Here we have 4 stages.They are development stage,QA stage,deployment stage and UAT stage.But after each stage the job of the project manger is  same ie. to review the project status and give go ahead.

And let's try to understand CyclicBarrier with the help of  the above example.

CyclicBarrier:

  • it is a  synchronizer that allows a set of threads that all wait for each other to reach a common barrier point.
Here in our example the developers will wait for each other till the completion of all the developers in stage1.
Similarly in stage 2 QA guys will start their work and wait for each other to complete.
Similarly in stage 3 the deployment guys will start their work and wait for each other to complete.
And in stage 4 the UAT guys will start their work and wait for each other to complete.
  •  It is helpful for for multithreaded operations that occur in stages with intermediate results from the different threads need to be combined between stages.
 Here in our example in stage 1 developers threads  are completing  their work and project manager is reviewing the work by combining them and gave go ahead.Again in stage 2 the QA threads take those output of stage 1 and start their work.In stage 3 the deployment threads take the output from stage 2 and start their work.And in stage 4 the UAT threads take the output from stage 3 and start their work.
  • It explicitly allow one thread to tell others to stop waiting at the barrier point ,if any interruption or error occurs by throwing InterruptedException or BrokenBarrierException.
Here in our example if any exception occurs in any developer thread in stage 1 ,then all other developer threads waiting at barrier will also leave the barrier point by throwing exception.
Java Doc syas:The CyclicBarrier  uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException(or InterruptedException if they too were interrupted at about the same time).

  • A single threaded operations required between different stages to combine the result from the different threads in each stage.  
 Here in our example the project manager is there to combine and review the codes generated by each thread in each stages.Here we can think of project manager thread as  the single thread required to join the results of each thread.
Java Doc Says:A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released.

This is called barrier action thread.And this thread is called after all the parties arrived at the barrier point.

CyclicBarrier Construction:

We can construct  the CyclicBarrier with the following parameters
  • The number of threads that will take part in parallel processing.
  • And/or the barrier action thread which is called at the end of each stage to combine the result of all the parallel threads.But this parameter is optional. 

CyclicBarrier cb=new CyclicBarrier(int nosOfParallelThreads);
 CyclicBarrier cb=new CyclicBarrier(int nosOfParallelThreads,Runnable barrierActionThread);

1.CyclicBarrier  class is used as a barrier for a set of threads, to keep them waiting until all the other  threads have reached it.We said a thread reaches the barrier if it calls the await() method of the CyclicBarrier  class.

2.The barrier is said to be tripped once all the predefined number of threads reached the barrier.

3.The last thread to reach the barrier(or calls the await() method) executes this Runnable  action before the other waiting threads are released.

4.The threads will keep waiting until the number of waiting threads is equal to the number passed at the time of CyclicBarrier construction or the waiting thread is interrupted by some other thread or the barrier is broken or reset.

5.If any thread is interrupted while waiting at the barrier, then all other waiting threads will throw BrokenBarrierException  and barrier is broken.

6.The Runnable action supplied in constructor will not be executed if barrier is broken.

7.The getNumberWaiting() method returns the number of threads waiting or blocked at the barrier for the barrier to be tripped.

8.The getParties() method returns the number of threads required to trip the barrier.

Working of CyclicBarrier:

  • Each thread performs its own module.
  • After completion of the module , each thread calls the await() method.
  • The await() method returns only when the predefined number of threads in constructor have called await() method.
  • if any of the threads is interrupted or times out while waiting for the barrier, then the barrier is broken and all other waiting threads receive a   BrokenBarrierException.

Code Sample:

 DeveloperThread:

package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class DeveloperThread implements Runnable{
    private CyclicBarrier cyclicBarrier;
    public void run() {
    System.out.println(Thread.currentThread().getName()+" has finished his work and waiting on barrier for other developer threads to reach at the barrier.");
    try {
        cyclicBarrier.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
       
    }
    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
   }

QAThread  :


package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class QAThread implements Runnable{
    private CyclicBarrier cyclicBarrier;

    public void run() {
        System.out.println(Thread.currentThread().getName()+" has completed his work and waiting for other QA threads to reach at the barrier");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
}

DelploymentThread :


package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class DelploymentThread implements Runnable{
    private CyclicBarrier cyclicBarrier;
    public void run() {
        System.out.println(Thread.currentThread().getName()+" has completed his work and waiting for other deployment thread to reach at the barrier.");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
}

UATThread :


package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class UATThread implements Runnable{
    private CyclicBarrier cyclicBarrier;

    public void run() {
        System.out.println(Thread.currentThread().getName()+" has completed his work and waiting for other UAT threads to reach at the barrier");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
}

ManagerBarrierActionThread:


package com.brainatjava.test;

public class ManagerBarrierActionThread implements Runnable{
    int stageCount;
    public int getStageCount() {
        return stageCount;
    }
    public void setStageCount(int stageCount) {
        this.stageCount = stageCount;
    }
    public void run() {
        stageCount++;
        System.out.println("\nProject manager  reviewed the code and gave go ahead.And this completes stage "+stageCount+"\n");
       
    }
}



MainThread which will invoke all this thread:

package com.brainatjava.test;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
public static void main(String[] args) {
    final int Thread_COUNT=3;
    final int DEVELOPEMENT_COMPLETED=1;
    final int QA_COMPLETED=2;
    final int DEPOYMENT_COMPLETED=3;
    boolean flag =true;
    ManagerBarrierActionThread managerBarrierActionThread=new ManagerBarrierActionThread();
    CyclicBarrier cb=new CyclicBarrier(Thread_COUNT,managerBarrierActionThread);
    for(int i=0;i<3;i++){
    DeveloperThread developers=new DeveloperThread();
    developers.setCyclicBarrier(cb);
    Thread developerThread=new Thread(developers,"DeveloperThread"+(i+1));
    developerThread.start();
}
       
    while(flag){
        int stageCount=managerBarrierActionThread.getStageCount();
        if(stageCount==DEVELOPEMENT_COMPLETED){
        flag=false;
        for(int i=0;i <3;i++){

        QAThread testers=new QAThread();
        testers.setCyclicBarrier(cb);
        Thread qaThread=new Thread(testers,"TesterThread"+(i+1));
       
qaThread.start();
    }
    }
       
}
    flag=true;
    while(flag){
        int stageCount=managerBarrierActionThread.getStageCount();
        if(stageCount==QA_COMPLETED){
        flag=false;
       for(int i=0;i<3;i++){

        DelploymentThread deployers=new DelploymentThread();
        deployers.setCyclicBarrier(cb);
        Thread deploymentThread=new Thread(deployers,"DeploymentThread"+(i+1));
        deploymentThread.start();
    }
    }
       
}
    flag=true;
    while(flag){
        int stageCount=managerBarrierActionThread.getStageCount();
        if(stageCount==DEPOYMENT_COMPLETED){
        flag=false;
       for(int i=0;i<3;i++){

        UATThread uat=new UATThread();
        uatThreads.setCyclicBarrier(cb);
        Thread uatThread=new Thread(
uat,"UATThread"+(i+1));
       
uatThread.start();
    }
    }
       
}
}
}

The output of the above code is :

DeveloperThread1 has finished his work and waiting on barrier for developer threads to reach at the barrier.
DeveloperThread3 has finished his work and waiting on barrier for developer threads to reach at the barrier.
DeveloperThread2 has finished his work and waiting on barrier for developer threads to reach at the barrier.

Project manager  reviewed the code and gave go ahead.And this completes stage 1

TesterThread1 has completed his work and waiting for other QA threads to reach at the barrier
TesterThread2 has completed his work and waiting for other QA threads to reach at the barrier
TesterThread3 has completed his work and waiting for other QA threads to reach at the barrier

Project manager  reviewed the code and gave go ahead.And this completes stage 2

DeploymentThread1 has completed his work and waiting for other deployment thread to reach at the barrier.
DeploymentThread2 has completed his work and waiting for other deployment thread to reach at the barrier.
DeploymentThread3 has completed his work and waiting for other deployment thread to reach at the barrier.

Project manager  reviewed the code and gave go ahead.And this completes stage 3

UATThread1 has completed his work and waiting for other UAT threads to reach at the barrier
UATThread2 has completed his work and waiting for other UAT threads to reach at the barrier
UATThread3 has completed his work and waiting for other UAT threads to reach at the barrier

Project manager  reviewed the code and gave go ahead.And this completes stage 4

If you notice the output we will see in  stage 1 or iteration 1 3 developers threads run parallely and completed their work ,then manger reviews the progress and gave go ahead.
Similarly in stage 2 or iteration 2 Tester threads have done their work .
In Stage 3 or iteration 3 Deployment Threads have done their work .
In Stage 4 or iteration 4 UAT Threads have done their work.

Note:

Here one thing need to observe that we are using the same cyclic barrier again and again for different stages/iterations without resetting it.So the barrier is called cyclic.We will dig deep about it in next series.We will also discuss one more variant of CyclicBarrier in part 2 of this series and will know some other important properties of  it.

Tuesday, August 18, 2015

CountDownLatch:Java concurrency

In our previous series of Mutex and Semaphore , we learn about the different Synchronizers.Now we discuss about one of them that is CountDownLatch.Let's take the understanding of CountDownLatch With the help of an example.

        Let's take a real life scenario.Suppose we get a new project from a client.We have one architect and 10 developers.First the architect will design the project  and decide the technologies to use.He will decide about the project flow and create the different modules.But till then all the developers need to wait for the architect to finish all his work.It means developers are coming to office but they are not doing anything useful.Because they have dependency on the  architect.Just think they are waiting for a lock to be released by architect.After the lock is released by the architect the developers will get go ahead.Then they will start to work on there assigned module.Assume till the time the architect is waiting for completion of the task which is going on by developers.Just think that he is waiting for the lock to be released by developers.Remember that he can not proceed until all the developers have completed their work.If 5 developers have completed their work,then he still has to wait for remaining 5 developers.Then after the architect will proceed to review all the pieces of code and integrate them.This scenario is perfectly suitable for use of a CountDownLatch.


Definition:CountDownLatch  is a synchronization aid or a synchronizer  that allows one or more threads to wait for one or more other threads to complete. 

Implementation In Java:
 
To implement java provides a class know as CountDownLatch.We can create  a CountDownLatch object with integer argument like  
   CountDownLatch architectLatch = new CountDownLatch(1);
CountDownLatch developerLatch = new CountDownLatch(10);
  Let's consider our example that we have 10 developers all are waiting on architectLatch by calling architectLatch.await() method.Here await() method will cause the calling threads(i.e our developer threads) to wait until the latch has counted down to zero.Then after completing its work the architect thread will call architectLatch.countDown().The countDown() method decreases the count of the latch by 1.Now architect thread will wait by calling developerLatch.await().If the count of the latch reaches at zero , then all the threads waiting on this CountDownLatch (ie. architectLatch) will start.
 After completing their work each developer thread will call developerLatch.countDown().
When all the developer threads complete then the count of the developerLatch will be zero and architect thread will resume and start working.Take  a look at the implementation details.

import java.util.concurrent.CountDownLatch;

public class Architect {

        public static void main(String args[]) throws InterruptedException {
            final int totalNosOfLatchCount=10;
             CountDownLatch architectLatch = new CountDownLatch(1);
             CountDownLatch developerLatch = new CountDownLatch(
totalNosOfLatchCount);
             for (int i = 0; i < totalNosOfLatchCount; ++i)
               new Thread(new Developer(architectLatch, developerLatch)).start();

             completeModuleDesign();       
    
             architectLatch.countDown();   
  
             System.out.println("Architect has decreased the count of architectLatch and 
             going to wait on  developerLatch.");
             developerLatch.await();       
   
             System.out.println("Now all the developers have completed there work and architect is going to resume.");
           }
        public static void  completeModuleDesign(){
            System.out.println("Architect has completed the design of all his module. ");
         }
}
import java.util.concurrent.CountDownLatch;

public class Developer  implements Runnable {
           private final CountDownLatch architectLatch;
           private final CountDownLatch developerLatch;
           Developer(CountDownLatch startSignal, CountDownLatch doneSignal) {
              this.architectLatch = startSignal;
              this.developerLatch = doneSignal;
           }
           public void run() {
              try {
                  architectLatch.await();
                  completeDevloperModule();
                developerLatch.countDown();
                System.out.println("Each developer has decreased the count on developerLatch by       one.\nAnd number of avaialble count on developerLatch is "+developerLatch.getCount());
              } catch (InterruptedException ex) {} // return;
           }

           void completeDevloperModule() {
               System.out.println("Developer is completing his own module");
           }
         }



Reusability of CountDownLatch:

 One important thing we need to note here that , after reaching the count zero,the  CountDownLatch can't be reused.There is no method to increase the count of CountDownLatch.We can think of it as a benefit as no one can increase the count by mistake also.If we call again the await() method after the count reaches 0, there will be no result.Simply it will pass through that and will not throw any exception. Still if we want to reset the count for programming purpose, we can consider using CycclicBarrier. 

Comparison of CountDownLatch with wait(),notify() and join():


At first glance it seems like we can achieve this with the help of wait ,notify and join mechanism.But CountDownLatch is more forgiving in the case of missed signal.Consider a scenario , if one thread is waiting and another thread is notifying him,but just assume that the thread1, call it notifying thread, started first and it notified the the thread which is supposed to receive the notification signal,which is not started yet.But the thread which need to receive the notification signal started after that.Now the second thread which need to receive the notification signal will be in a stall state, because it missed the notification signal issued by the first thread.But in this case CountDownLatch will serve perfectly well.That is if count is down first and await method will invoked later,there will be no missed signal issue.

Wednesday, July 29, 2015

Difference Between Semaphore and Mutex and How to create a custom semaphore

  We know  that if we want guard a critical section(shared resource),so that a single thread can access it,
we use synchronize block for the critical section.So that in a multi threaded environment we will not face any lost update problem.There are number  of ways to make a shared resource thread safe.Like we all know one of them is by using synchronize keyword.In java ,there are some constructs and tools to help the developer to achieve this goal..Example of some such constructs are Semaphore,CountDownLatch,CyclicBarrier,Concurrent HashMap,HashTable,wait,notify,notifyAll.By using all the tools and constructs we can achieve our goal,ie to guard a shared resource(critical section) in a multi threaded environment.From now Onwards we will call such constructs as synchronizes. 

Mutex(One Thread at a Time):

Assume the scenario that there is a small library having only chair to sit and read.But there are N number of students are waiting in queue.So the first student will come and take the key and enters the room.Till then all other students are waiting.When he will complete he will  come out and  give the key to the next student in the queue.Then only the next student will enter the room and will start the study.Here the study room is the critical resource.And all the students are threads.They are accessing the critical section one by one,in sequential manner.
Here mutex is the perfect candidate to apply. Because here mutually exclusion is desired.
A mutex is locking mechanism used to synchronize access to a resource. Only one task (can be a thread or process) can acquire the mutex. It means there is ownership associated with mutex, and only the owner can release the lock (mutex).Here in our example student is the owner of the resource(library room) and after the completion of the study,he will release the room(the shared resource).
 A mutex(lock) has the concept of ownership so it may be reentrant. That means that a thread that holds a lock, is allowed to call a critical section again where same lock is required. Because the thread already holds the lock and is allowed to reenter it.

Semaphores(Number Of Specified threads at a time):

As we saw in mutex only one thread can access the critical section at a time.But In contrary to this more than one thread can access the critical section at the same time.Let's come to our previous example.Assume we have a large library room having capacity for 100 students.There are 100 chairs and and 100 students can read at the same time togeather.Assume here that the room is the critical section, and students are threads.Initially we define the semaphore count as hundred.When one student will go inside he will increase the permit(say total number of working threads) by one.Similarly when second student will go inside,  he will increase the permit by one.And so on.Before giving permission to a student to enter the room , the total number of permit will be checked.If it is less than 100 ,then only the particular student will be allowed to go inside.Otherwise he will wait till some student to come and decrease the permit by one.

In Java , Semaphores have no notion of ownership, so they cannot be reentrant.One property of Semaphores in Java is that release doesn’t have to be called by the same thread as acquire.In a scenario assume we have a code that creating threads in a pool if and only if semaphore.acquire() succeed.Now after this those threads will start and call semaphore.release() when they complete.This is a useful property that we don’t have with mutexes(lock) in Java.A thread can succeeded in semaphore.acquire(), if there is at least one permit is available.

Now let's try to understand semaphore with the help of an example.The below example is a modified version of the example of semaphore as given in java doc.


class Pool {
   private static final int MAX_AVAILABLE = 10;
       private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
       public Object getItem() throws InterruptedException {
           System.out.println("Going to  acquire lock for thread id "+Thread.currentThread().getId()+" available permit "+available.availablePermits() +" ");
         available.acquire();
       
       //  System.out.println("lock acquired for thread id "+Thread.currentThread().getId()+" available permit "+available.availablePermits() +" ");
         return getNextAvailableItem();
       }
       public void putItem(Object x) {
         if (markAsUnused(x))
           available.release();
         System.out.println("lock released for thread id "+Thread.currentThread().getId());
       }
       // Not a particularly efficient data structure; just for demo
       protected String[] items ={"a","b","c","d","e","f","g","h","i","j"};
       protected boolean[] used = new boolean[MAX_AVAILABLE];
       protected synchronized Object getNextAvailableItem() {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (!used[i]) {
              used[i] = true;
              return items[i];
           }
         }
         return null; // not reached
       }
       protected synchronized boolean markAsUnused(Object item) {
         for (int i = 0; i < MAX_AVAILABLE; ++i) {
           if (item == items[i]) {
              if (used[i]) {
                used[i] = false;
                return true;
              } else
                return false;
           }
         }
         return false;
       }
     }
}



public class PoolTest {
    public static void main(String[] args) throws InterruptedException {
           Pool t1=new Pool();
for(int i=0;i<200 i="" p="">
new Thread(                 new Runnable(){                     public void run(){                         try {                             Thread.currentThread().setName(new Object().toString());                             System.out.println(t1.getItem().toString()+" for thread id "+Thread.currentThread().getId());                         } catch (InterruptedException e) {                             // TODO Auto-generated catch block                             e.printStackTrace();                         }                                             t1.putItem("a");                                         }             }).start();         }             } }

Explanation:

Just look at the code above.Here initially we define a semaphore with permit 10.In getItem() method first thread will call acquire() method.It is a blocking call.It acquires a permit from this semaphore and blocks until one permit is available or thread interruption occurs.If one permit is available ,then the the acquire() method return immediately by reducing the number of available permits by one.
If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies idle until one of two things happens:
Some other thread invokes the release() method on the same  semaphore and the current thread is next to be assigned a permit; or
Some other thread interrupts the current thread. And then we call the method getNextAvailableItem().It returns an item from the items array defined in pool class.

The thread call the putItem(Object x) method.This method takes the item and mark it as unused.Then call the realease method on the semaphore.
Releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread scheduling purposes.
There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire() method.

 Wrong Uses Of Semaphore:

But note one thing here that, we are always returning the string "a" in putItem(Object x) method.But we taking every item from the array.so there will be a case where all the items a,b,c,d,e,f,g,h,i,j will be consumed from the array but only a will be retuned.so,in that case number of permits will be 1 only.Here this is a case where permits are taken but never returned.That is only acquire occurs but no release.This is a misuse of semaphore.

Also one more thing to note here that we should alway call release() method in finally block.If some exception occurs before calling the release method,then also release can be called safely.And number of permit can be increased.

Fairness In Semaphore:

Java's built-in concurrency tools (synchronized, wait(), notify()) do not mention which thread should be get turn for getting the access to the critical section when a lock is released. It is up to the implementation of the JVM to decide..
Fairness gives us more control: when the lock is released, the thread with the longest wait time is given the lock (FIFO processing). Without fairness we might have a situation where a thread is always waiting for the lock because  other threads  are continuously requesting for the same.

 Semaphores can be initialized with constructor Semaphore(int permits) or Semaphore(int permits,boolean fair).Semaphore(int permits,boolean fair) creates a semaphore with fairness setting.When fair is set to true,the semaphore gives permit to access the critical section in the order the threads have ask for it (FIFO).And when fair is set as false,then semaphore can grant access to the the thread asking for it rather than the thread which is waiting before.To avoid starving fair should be set to true.

How TO Break Fairness Policy:

There is a method with signature public boolen tryAcquire().This method acquires a permit from this semaphore,only if one is available at the time of invocation. This method Acquires a permit, if one is available and returns immediately, with the value true, reducing the number of available permits by one.If no permit is available, then the method will return immediately with value false. Even when this semaphore has been set to use a fair ordering policy, a call to tryAcquire() will immediately acquire a permit if one is available, whether or not other threads are currently waiting. This (rude) "barging" behavior can be useful in certain circumstances, even though it breaks fairness.  If we want to respect the fairness setting that we set in constructor, then we should use tryAcquire(0, TimeUnit.SECONDS)


Acquire and Release multiple permits at same time:

There is a  method with signature  public boolean tryAcquire(int permits).When the method is called, acquires the given number of permits, if they are available, and returns immediately, with the value true, reducing the number of available permits by the given amount.
If insufficient permits are available then this method will return immediately with the value false and the number of available permits is unchanged.

Similarly we can release more than one permit in one go with the help of below method

public void release(int permits)

Releases the given number of permits, increasing the number of available permits by that amount. If any threads are trying to acquire permits, then one is selected and given the permits that were just released. If the number of available permits satisfies that thread's request then that thread is (re)enabled for thread scheduling purposes; otherwise the thread will wait until sufficient permits are available. If there are still permits available after this thread's request has been satisfied, then those permits are assigned in turn to other threads trying to acquire permits.There is no requirement that a thread that releases a permit must have acquired that permit. 

Create a custom Semaphore:

Finally, let's take a look how to create a custom semaphore with minimal setting.
public class MySemaphore {
        private int permit;

        public MySemaphore () {
            this(0);
        }

        public MySemaphore (int i) {
            if (i < 0)
                throw new IllegalArgumentException(i + " < 0");
            counter = i;
        }

        public synchronized void release() {
            
                this.notify();
            
            permit++;
        }

        public synchronized void acquire() throws InterruptedException {
            while (permit== 0) {
                this.wait();
            }
            permit--;
        }
    }