背景

       redis作为一个内存数据库,在分布式的服务的大环境下,占的比重越来越大啦,下面我们和大家一起探讨一下如何使用redis实现一个分布式锁 

说明

      一个分布式锁至少要满足下面几个条件

     1:互斥性

              多个客户端竞争的时候,只能有一个客户端能获取锁

      2:安全性

              谁创建,谁销毁,客户端A创建了分布式锁,只能有A来销毁

     3:容错性

            某个redis节点挂啦,不会影响客户端创建或者销毁分布式锁

     4:避免死锁

            客户端A创建了分布式锁因程序异常未释放,不会造成其他客户端再也无法申请到锁

       下面我们基于上面四个基本准则一起来设计分布式锁,主要有2个方法,①尝试获取锁,②释放锁

   尝试获取锁

   这一段代码中有很多容易犯错的地方

public boolean trylock(String lockKey,String lockValue,Long lockWaitTimeout,Long lockExpirseTimeout){
int timeout = lockWaitTimeout.intValue();
while (timeout >= 0){
String expireTimeout = String.valueOf(lockExpirseTimeout/1000);
List<String> keys = new ArrayList<String>();
keys.add(lockKey);

List<String> args = new ArrayList<String>();
args.add(lockValue);
args.add(expireTimeout);

//①使用lua脚本,setnx创建锁,并设置过期时间,这里网上大多数教程都是直接将value值设置为过期时间,人工判断,我在这里通过lua脚本给加一个过期时间
/**
      //伪代码
// 如果当前锁不存在,返回加锁成功,
      if (jedis.setnx(lockKey, expiresStr) == 1) {
// 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁,建议将2个命令通过lua脚本一起执行,保证原则性
          jedis.expire(lockKey, expireTime);
return true;
      }
*/
// 如果当前锁不存在,返回加锁成功
String lockLuaScript = setNxLuaScript();
Long exeResult = exeLuaScript(lockLuaScript,keys,args,Long.class);
if (exeResult!=null && exeResult.intValue() == 1){
return true;
}

// 如果锁存在,获取锁的过期时间
String lockTimeStr = get(lockKey);
if (lockTimeStr != null && Long.parseLong(lockTimeStr) < System.currentTimeMillis()){
// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
String oldLockTimeStr = getAndSet(lockKey,lockValue);
// 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
if (oldLockTimeStr != null && oldLockTimeStr.equals(lockTimeStr)){
//大多数网上源代码中是木有这一行代码的,此行是为了解决高并发情况下,getSet虽然只有一个设置成功,但是value值可能会被覆盖,所以重新设置一下
set(lockKey,lockValue,Long.valueOf(expireTimeout),TimeUnit.SECONDS);
return true;
}
}
int sleepTime=new Random().nextInt(10)*100;
timeout -= sleepTime;
try {
log.info("获取redis分布式锁失败,sleep:{}ms后重新获取",sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 其他情况,一律返回加锁失败
return false;
}

private String setNxLuaScript(){
StringBuffer luascript = new StringBuffer();
luascript.append(" if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then ");
luascript.append(" redis.call('expire',KEYS[1],ARGV[2]) return 1");
luascript.append(" else return 0 end");
return luascript.toString();
}

释放锁

/**
   *网上很多代码都木有考虑,谁创建,谁销毁这个准则
* 通过获取lockKey的值和当初设定oldValue是否一致,来决定客户端是否有权利来释放锁,由于这是2个命令,考虑高并发情况,所以通过lua脚本,将2个命令放在一起执行,保证原子性
*/
public void unlock(String lockKey,String oldValue){
String luascript = delLuaScript();
List<String> keys = new ArrayList<String>();
keys.add(lockKey);
List<String> args = new ArrayList<String>();
args.add(oldValue);
exeLuaScript(luascript,keys,args,Long.class);
}


private String delLuaScript(){
StringBuffer luascript = new StringBuffer();
luascript.append(" if redis.call('exists',KEYS[1]) == 1 and redis.call('get',KEYS[1]) == ARGV[1] then");
luascript.append(" redis.call('del',KEYS[1]) return 1");
luascript.append(" else return 0 end");
return luascript.toString();
}

//执行lua脚本命令

public <T> T exeLuaScript(String luaScript, List<String> keys, List<String> args,Class<T> clz){
      T t = (T)redisTemplate.execute(new RedisCallback<T>(){
@Override
public T doInRedis(RedisConnection redisConnection) throws DataAccessException {

Object nativeConnection = redisConnection.getNativeConnection();
if (nativeConnection instanceof JedisCluster) {
return (T)((JedisCluster) nativeConnection).eval(luaScript.toString(), keys, args);
} // 单机模式
else if (nativeConnection instanceof Jedis) {

return (T) ((Jedis) nativeConnection).eval(luaScript.toString(), keys, args);
}
return null;
}
});

if(t == null){
throw new RuntimeException("redis model doesn't support luascript");
}
return t;
}

上述代码目前依然存在的问题
①:当业务耗时时间大于分布式锁的过期时间lockExpirseTimeout,会造成同时有2个客户端获取到了分布式锁
②:容错性问题还有待解决
 
  1.