Saturday, April 23, 2016

Redis Pub Sub with Jedis

Within these days we were working on a project which heavily relies on Redis for its data sync process.To give  more clarity here we are briefing  the scenario.

There are two parts on the system.One is a producer which is making some cache in Redis and another is a consumer which is subscribed to the same Redis.The subscription is through a channel.The consumer is subscribed to the channel.When there is some change in the channel ,it is published to the consumer and the consumer  updates itself accordingly.Here we used Jedis as Redis  client.



In Redis, we can subscribe to multiple channels and when someone publishes messages on those channels, Redis notifies us  with published messages. Jedis provides this functionality with JedisPubSub abstract class. To handle pub / sub events, we need to extend JedisPubSub class and implement the abstract methods.



package com.brainatjava.test;
public class xyzListener extends JedisPubSub {   @Override
    public void onMessage(String channel, String message) {
    System.out.println(message);
}

Now we wrote the code for registering listener in a different class.

public class TestProgram {
private void registerListeners() {
        if(!xyzListener.isSubscribed()){
                Runnable task=()->{       
                try{
                    Jedis jdeisConnection=    ((Jedis) redisTemplate.getConnectionFactory().getConnection().getNativeConnection());
                    jedisConnectionExceptionFlag=false;
                    jdeisConnection.subscribe(lineItemDeliveryListener, "channelName");
                      
                }
                catch(JedisConnectionException jce){
                    logger.error("got jedis connection excpetion "+jce.getMessage(),jce);
                    jedisConnectionExceptionFlag=true;
                }
                catch(RedisConnectionFailureException rce){
                    logger.error("got jedis RedisConnectionFailureException excpetion "+rce.getMessage(),rce);
                    jedisConnectionExceptionFlag=true;
                }
                catch(Exception e){
                logger.error("error in registerListeners "+xyzListener.isSubscribed(),e);
            }};
           
            Thread xyzUpdater = new Thread(task);
            xyzUpdater.setName("xyzUpdater");
            xyzUpdater.start();
        }
    }

}
Here if we notice the above code ,we found that  first we are checking is xyzListener is subscribed to the required channel ,if not we are doing it.But Observe that we are doing it in a hacky way.That is we are getting the native connection first and then we are subscribing to the channel.And the subscribe is a blocking call.It act in wait and watch mode.

So when there is a change in the channel the listener listens it and update its cache accordingly.But There is always should have a fail safe mechanism in place.We have also done that.If somehow Jedis pubsub is not working,then we have a mechanisim in place to do it manually.

The mechanism is like we have a cron scheduler running in every 15 minutes and    checking the cache timestamp and if the cache timestamp of the latest cache and the timestamp of the consumer varies, then we assume there is issue with pubsub and we will update the consumer cache manually.

With this design everything was fine and  the 15 minutes cron was their without any use.After the smooth working of some days we got an alert that the 15 minute cron is running and manually updating the cache.So it has no impact on our service as the cache is getting updated manually with the help of cron scheduler.

But why this happened?After investigetting sometimes we found before some  days the redis was restarted.And this created the whole issue.It is behaving like it is subscribed.Now you know the solution.