Monday, August 24, 2015

CyclicBarrier:Java Concurrency

In our previous series we have learned about different synchronizers like Semaphore,CountdownLatch etc.
Similarly CyclicBarrier is another synchronizer.We will try to understand it with the help of an example.

Let's assume we have a project.Now in initial stage assume there are 3 developers are working on it.And the project manager is there to review the status of the project.After working for some days the developers completed there code and the project manager reviewed the status.And he is happy now.All is well.Now he decided to move to the next phase.Assume there are 3 QA guys are there.Now the code is submitted to QA team for quality testing.After working some days , the QA team has completed the process.Again project manager reviewed the status of the project and he is happy with the progress.Then he decided to move to the next step ie. deployment in staging server  for  SIT(System Integration Test).Assume there are 3 support guys working on this and after few days,they completed there work.The manger reviewed the status and he is happy with it.Now he decided to move to the next step ie. for UAT(User Acceptance Test).Suppose there are 3 guys who is handling this process and he completes his work after some efforts.Now the project manager reviewed the status of the project and he is happy with it and gave green signal to go to the next step ie. to live the project.

Here Remember one thing that when one developer completes his work,he will wait for fellow developers to complete their work.That is they will wait for each other,unless all the  developers complete their work.Similarly all QA guys will wait for each other to complete their work.And same policy will apply for
the deployment and UAT guys.

Here one important thing to note is that after each stage the project manager is reviewing code and  giving go ahead after doing necessary corrections.Here we have 4 stages.They are development stage,QA stage,deployment stage and UAT stage.But after each stage the job of the project manger is  same ie. to review the project status and give go ahead.

And let's try to understand CyclicBarrier with the help of  the above example.

CyclicBarrier:

  • it is a  synchronizer that allows a set of threads that all wait for each other to reach a common barrier point.
Here in our example the developers will wait for each other till the completion of all the developers in stage1.
Similarly in stage 2 QA guys will start their work and wait for each other to complete.
Similarly in stage 3 the deployment guys will start their work and wait for each other to complete.
And in stage 4 the UAT guys will start their work and wait for each other to complete.
  •  It is helpful for for multithreaded operations that occur in stages with intermediate results from the different threads need to be combined between stages.
 Here in our example in stage 1 developers threads  are completing  their work and project manager is reviewing the work by combining them and gave go ahead.Again in stage 2 the QA threads take those output of stage 1 and start their work.In stage 3 the deployment threads take the output from stage 2 and start their work.And in stage 4 the UAT threads take the output from stage 3 and start their work.
  • It explicitly allow one thread to tell others to stop waiting at the barrier point ,if any interruption or error occurs by throwing InterruptedException or BrokenBarrierException.
Here in our example if any exception occurs in any developer thread in stage 1 ,then all other developer threads waiting at barrier will also leave the barrier point by throwing exception.
Java Doc syas:The CyclicBarrier  uses an all-or-none breakage model for failed synchronization attempts: If a thread leaves a barrier point prematurely because of interruption, failure, or timeout, all other threads waiting at that barrier point will also leave abnormally via BrokenBarrierException(or InterruptedException if they too were interrupted at about the same time).

  • A single threaded operations required between different stages to combine the result from the different threads in each stage.  
 Here in our example the project manager is there to combine and review the codes generated by each thread in each stages.Here we can think of project manager thread as  the single thread required to join the results of each thread.
Java Doc Says:A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released.

This is called barrier action thread.And this thread is called after all the parties arrived at the barrier point.

CyclicBarrier Construction:

We can construct  the CyclicBarrier with the following parameters
  • The number of threads that will take part in parallel processing.
  • And/or the barrier action thread which is called at the end of each stage to combine the result of all the parallel threads.But this parameter is optional. 

CyclicBarrier cb=new CyclicBarrier(int nosOfParallelThreads);
 CyclicBarrier cb=new CyclicBarrier(int nosOfParallelThreads,Runnable barrierActionThread);

1.CyclicBarrier  class is used as a barrier for a set of threads, to keep them waiting until all the other  threads have reached it.We said a thread reaches the barrier if it calls the await() method of the CyclicBarrier  class.

2.The barrier is said to be tripped once all the predefined number of threads reached the barrier.

3.The last thread to reach the barrier(or calls the await() method) executes this Runnable  action before the other waiting threads are released.

4.The threads will keep waiting until the number of waiting threads is equal to the number passed at the time of CyclicBarrier construction or the waiting thread is interrupted by some other thread or the barrier is broken or reset.

5.If any thread is interrupted while waiting at the barrier, then all other waiting threads will throw BrokenBarrierException  and barrier is broken.

6.The Runnable action supplied in constructor will not be executed if barrier is broken.

7.The getNumberWaiting() method returns the number of threads waiting or blocked at the barrier for the barrier to be tripped.

8.The getParties() method returns the number of threads required to trip the barrier.

Working of CyclicBarrier:

  • Each thread performs its own module.
  • After completion of the module , each thread calls the await() method.
  • The await() method returns only when the predefined number of threads in constructor have called await() method.
  • if any of the threads is interrupted or times out while waiting for the barrier, then the barrier is broken and all other waiting threads receive a   BrokenBarrierException.

Code Sample:

 DeveloperThread:

package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class DeveloperThread implements Runnable{
    private CyclicBarrier cyclicBarrier;
    public void run() {
    System.out.println(Thread.currentThread().getName()+" has finished his work and waiting on barrier for other developer threads to reach at the barrier.");
    try {
        cyclicBarrier.await();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
       
    }
    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
   }

QAThread  :


package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class QAThread implements Runnable{
    private CyclicBarrier cyclicBarrier;

    public void run() {
        System.out.println(Thread.currentThread().getName()+" has completed his work and waiting for other QA threads to reach at the barrier");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
}

DelploymentThread :


package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class DelploymentThread implements Runnable{
    private CyclicBarrier cyclicBarrier;
    public void run() {
        System.out.println(Thread.currentThread().getName()+" has completed his work and waiting for other deployment thread to reach at the barrier.");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }
    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
}

UATThread :


package com.brainatjava.test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class UATThread implements Runnable{
    private CyclicBarrier cyclicBarrier;

    public void run() {
        System.out.println(Thread.currentThread().getName()+" has completed his work and waiting for other UAT threads to reach at the barrier");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public void setCyclicBarrier(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    public CyclicBarrier getCyclicBarrier() {
        return cyclicBarrier;
    }
}

ManagerBarrierActionThread:


package com.brainatjava.test;

public class ManagerBarrierActionThread implements Runnable{
    int stageCount;
    public int getStageCount() {
        return stageCount;
    }
    public void setStageCount(int stageCount) {
        this.stageCount = stageCount;
    }
    public void run() {
        stageCount++;
        System.out.println("\nProject manager  reviewed the code and gave go ahead.And this completes stage "+stageCount+"\n");
       
    }
}



MainThread which will invoke all this thread:

package com.brainatjava.test;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
public static void main(String[] args) {
    final int Thread_COUNT=3;
    final int DEVELOPEMENT_COMPLETED=1;
    final int QA_COMPLETED=2;
    final int DEPOYMENT_COMPLETED=3;
    boolean flag =true;
    ManagerBarrierActionThread managerBarrierActionThread=new ManagerBarrierActionThread();
    CyclicBarrier cb=new CyclicBarrier(Thread_COUNT,managerBarrierActionThread);
    for(int i=0;i<3;i++){
    DeveloperThread developers=new DeveloperThread();
    developers.setCyclicBarrier(cb);
    Thread developerThread=new Thread(developers,"DeveloperThread"+(i+1));
    developerThread.start();
}
       
    while(flag){
        int stageCount=managerBarrierActionThread.getStageCount();
        if(stageCount==DEVELOPEMENT_COMPLETED){
        flag=false;
        for(int i=0;i <3;i++){

        QAThread testers=new QAThread();
        testers.setCyclicBarrier(cb);
        Thread qaThread=new Thread(testers,"TesterThread"+(i+1));
       
qaThread.start();
    }
    }
       
}
    flag=true;
    while(flag){
        int stageCount=managerBarrierActionThread.getStageCount();
        if(stageCount==QA_COMPLETED){
        flag=false;
       for(int i=0;i<3;i++){

        DelploymentThread deployers=new DelploymentThread();
        deployers.setCyclicBarrier(cb);
        Thread deploymentThread=new Thread(deployers,"DeploymentThread"+(i+1));
        deploymentThread.start();
    }
    }
       
}
    flag=true;
    while(flag){
        int stageCount=managerBarrierActionThread.getStageCount();
        if(stageCount==DEPOYMENT_COMPLETED){
        flag=false;
       for(int i=0;i<3;i++){

        UATThread uat=new UATThread();
        uatThreads.setCyclicBarrier(cb);
        Thread uatThread=new Thread(
uat,"UATThread"+(i+1));
       
uatThread.start();
    }
    }
       
}
}
}

The output of the above code is :

DeveloperThread1 has finished his work and waiting on barrier for developer threads to reach at the barrier.
DeveloperThread3 has finished his work and waiting on barrier for developer threads to reach at the barrier.
DeveloperThread2 has finished his work and waiting on barrier for developer threads to reach at the barrier.

Project manager  reviewed the code and gave go ahead.And this completes stage 1

TesterThread1 has completed his work and waiting for other QA threads to reach at the barrier
TesterThread2 has completed his work and waiting for other QA threads to reach at the barrier
TesterThread3 has completed his work and waiting for other QA threads to reach at the barrier

Project manager  reviewed the code and gave go ahead.And this completes stage 2

DeploymentThread1 has completed his work and waiting for other deployment thread to reach at the barrier.
DeploymentThread2 has completed his work and waiting for other deployment thread to reach at the barrier.
DeploymentThread3 has completed his work and waiting for other deployment thread to reach at the barrier.

Project manager  reviewed the code and gave go ahead.And this completes stage 3

UATThread1 has completed his work and waiting for other UAT threads to reach at the barrier
UATThread2 has completed his work and waiting for other UAT threads to reach at the barrier
UATThread3 has completed his work and waiting for other UAT threads to reach at the barrier

Project manager  reviewed the code and gave go ahead.And this completes stage 4

If you notice the output we will see in  stage 1 or iteration 1 3 developers threads run parallely and completed their work ,then manger reviews the progress and gave go ahead.
Similarly in stage 2 or iteration 2 Tester threads have done their work .
In Stage 3 or iteration 3 Deployment Threads have done their work .
In Stage 4 or iteration 4 UAT Threads have done their work.

Note:

Here one thing need to observe that we are using the same cyclic barrier again and again for different stages/iterations without resetting it.So the barrier is called cyclic.We will dig deep about it in next series.We will also discuss one more variant of CyclicBarrier in part 2 of this series and will know some other important properties of  it.

No comments:

Post a Comment