为什么需要分布式锁

在 jdk 中为我们提供了加锁的方式:

(1)synchronized 关键字

(2)volatile + CAS 实现的乐观锁

(3)ReadWriteLock 读写锁

(4)ReenTrantLock 可重入锁

等等,这些锁为我们变成提供极大的便利性,保证在多线程的情况下,保证线程安全。

但是在分布式系统中,上面的锁就统统没用了。

我们想要解决分布式系统中的并发问题,就需要引入分布式锁的概念。


java 代码实现

创作动机

首先是对锁实现原理的一个实现,理论指导实践,实践完善理论。

晚上关于 redis 分布式锁的文章一大堆,但是也都稂莠不齐。

redis 分布式锁工具有时候中间件团队不见得会提供,提供了也不见得经常维护,不如自己实现一个,知道原理,也方便修改。

接口定义

为了便于和 JDK 复用,我们让接口继承自 jdk 的 Lock 接口。

package com.github.houbb.lock.api.core; 
 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Lock; 
 
/** 
 * 锁定义 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public interface ILock extends Lock { 
 
    /** 
     * 尝试加锁 
     * @param time 时间 
     * @param unit 当为 
     * @param key key 
     * @return 返回 
     * @throws InterruptedException 异常 
     * @since 0.0.1 
     */ 
    boolean tryLock(long time, TimeUnit unit, 
                    String key) throws InterruptedException; 
 
    /** 
     * 尝试加锁 
     * @param key key 
     * @return 返回 
     * @since 0.0.1 
     */ 
    boolean tryLock(String key); 
 
    /** 
     * 解锁 
     * @param key key 
     * @since 0.0.1 
     */ 
    void unlock(String key); 
 
}

方法我们只添加了三个比较常用的核心方法,作为第一个版本,简单点。

后续陆续添加即可。

抽象实现

为了便于后期添加更多的所实现,这里首先实现了一个公用的抽象父类。

package com.github.houbb.lock.redis.core; 
 
import com.github.houbb.lock.api.core.ILock; 
import com.github.houbb.lock.redis.constant.LockRedisConst; 
import com.github.houbb.wait.api.IWait; 
 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
 
/** 
 * 抽象实现 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public abstract class AbstractLockRedis implements ILock { 
 
    /** 
     * 锁等待 
     * @since 0.0.1 
     */ 
    private final IWait wait; 
 
    protected AbstractLockRedis(IWait wait) { 
        this.wait = wait; 
    } 
 
    @Override 
    public void lock() { 
        throw new UnsupportedOperationException(); 
    } 
 
    @Override 
    public void lockInterruptibly() throws InterruptedException { 
        throw new UnsupportedOperationException(); 
    } 
 
    @Override 
    public boolean tryLock() { 
        return tryLock(LockRedisConst.DEFAULT_KEY); 
    } 
 
    @Override 
    public void unlock() { 
        unlock(LockRedisConst.DEFAULT_KEY); 
    } 
 
    @Override 
    public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException { 
        long startTimeMills = System.currentTimeMillis(); 
 
        // 一次获取,直接成功 
        boolean result = this.tryLock(key); 
        if(result) { 
            return true; 
        } 
 
        // 时间判断 
        if(time <= 0) { 
            return false; 
        } 
        long durationMills = unit.toMillis(time); 
        long endMills = startTimeMills + durationMills; 
 
        // 循环等待 
        while (System.currentTimeMillis() < endMills) { 
            result = tryLock(key); 
            if(result) { 
                return true; 
            } 
 
            // 等待 10ms 
            wait.wait(TimeUnit.MILLISECONDS, 10); 
        } 
        return false; 
    } 
 
    @Override 
    public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 
        return tryLock(time, unit, LockRedisConst.DEFAULT_KEY); 
    } 
 
    @Override 
    public Condition newCondition() { 
        throw new UnsupportedOperationException(); 
    } 
 
}

最核心的实际上是 public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException 方法。

这个方法会调用 this.tryLock(key) 获取锁,如果成功,直接返回;如果不成功,则循环等待。

这里设置了超时时间,如果超时,则直接返回 true。

redis 锁实现

我们实现的 redis 分布锁,继承自上面的抽象类。

package com.github.houbb.lock.redis.core; 
 
import com.github.houbb.heaven.util.lang.StringUtil; 
import com.github.houbb.id.api.Id; 
import com.github.houbb.id.core.util.IdThreadLocalHelper; 
import com.github.houbb.lock.redis.constant.LockRedisConst; 
import com.github.houbb.lock.redis.exception.LockRedisException; 
import com.github.houbb.lock.redis.support.operator.IOperator; 
import com.github.houbb.wait.api.IWait; 
 
/** 
 * 这里是基于 redis 实现 
 * 
 * 实际上也可以基于 zk/数据库等实现。 
 * 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public class LockRedis extends AbstractLockRedis { 
 
    /** 
     * redis 操作实现 
     * @since 0.0.1 
     */ 
    private final IOperator redisOperator; 
 
    /** 
     * 主键标识 
     * @since 0.0.1 
     */ 
    private final Id id; 
 
    public LockRedis(IWait wait, IOperator redisOperator, Id id) { 
        super(wait); 
        this.redisOperator = redisOperator; 
        this.id = id; 
    } 
 
    @Override 
    public boolean tryLock(String key) { 
        final String requestId = id.id(); 
        IdThreadLocalHelper.put(requestId); 
 
        return redisOperator.lock(key, requestId, LockRedisConst.DEFAULT_EXPIRE_MILLS); 
    } 
 
    @Override 
    public void unlock(String key) { 
        final String requestId = IdThreadLocalHelper.get(); 
        if(StringUtil.isEmpty(requestId)) { 
            String threadName = Thread.currentThread().getName(); 
            throw new LockRedisException("Thread " + threadName +" not contains requestId"); 
        } 
 
        boolean unlock = redisOperator.unlock(key, requestId); 
        if(!unlock) { 
            throw new LockRedisException("Unlock key " + key + " result is failed!"); 
        } 
    } 
}

这里就是 redis 锁的核心实现了,如果不太理解,建议回顾一下原理篇:

redis 分布式锁原理详解

加锁

加锁部分,这里会生成一个 id 标识,用于区分当前操作者。

为了安全也设置了默认的超时时间。

当然这里是为了简化调用者的使用成本,开发在使用的时候只需要关心自己要加锁的 key 即可。

当然,甚至连加锁的 key 都可以进一步抽象掉,比如封装 @DistributedLock 放在方法上,即可实现分布式锁。这个后续有时间可以拓展,原理也不难。

解锁

解锁的时候,就会获取当前进程的持有标识。

凭借当前线程持有的 id 标识,去解锁。

IOperator

我们对 redis 的操作进行了抽象,为什么抽象呢?

因为 redis 服务种类实际很多,可以是 redis 单点,集群,主从,哨兵。

连接的客户端也可以很多,jedis,spring redisTemplate, codis, redisson 等等。

这里为了后期拓展方便,就对操作进行了抽象。

接口

定义接口如下:

package com.github.houbb.lock.redis.support.operator; 
 
/** 
 * Redis 客户端 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public interface IOperator { 
 
    /** 
     * 尝试获取分布式锁 
     * 
     * @param lockKey    锁 
     * @param requestId  请求标识 
     * @param expireTimeMills 超期时间 
     * @return 是否获取成功 
     * @since 0.0.1 
     */ 
    boolean lock(String lockKey, String requestId, int expireTimeMills); 
 
    /** 
     * 解锁 
     * @param lockKey 锁 key 
     * @param requestId 请求标识 
     * @return 结果 
     * @since 0.0.1 
     */ 
    boolean unlock(String lockKey, String requestId); 
 
}

jedis 实现

我们实现一个 jedis 单点版本的:

package com.github.houbb.lock.redis.support.operator.impl; 
 
import com.github.houbb.lock.redis.constant.LockRedisConst; 
import com.github.houbb.lock.redis.support.operator.IOperator; 
import redis.clients.jedis.Jedis; 
 
import java.util.Collections; 
 
/** 
 * Redis 客户端 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public class JedisOperator implements IOperator { 
 
    /** 
     * jedis 客户端 
     * @since 0.0.1 
     */ 
    private final Jedis jedis; 
 
    public JedisOperator(Jedis jedis) { 
        this.jedis = jedis; 
    } 
 
    /** 
     * 尝试获取分布式锁 
     * 
     * expireTimeMills 保证当前进程挂掉,也能释放锁 
     * 
     * requestId 保证解锁的是当前进程(锁的持有者) 
     * 
     * @param lockKey         锁 
     * @param requestId       请求标识 
     * @param expireTimeMills 超期时间 
     * @return 是否获取成功 
     * @since 0.0.1 
     */ 
    @Override 
    public boolean lock(String lockKey, String requestId, int expireTimeMills) { 
        String result = jedis.set(lockKey, requestId, LockRedisConst.SET_IF_NOT_EXIST, LockRedisConst.SET_WITH_EXPIRE_TIME, expireTimeMills); 
        return LockRedisConst.LOCK_SUCCESS.equals(result); 
    } 
 
    /** 
     * 解锁 
     * 
     * (1)使用 requestId,保证为当前锁的持有者 
     * (2)使用 lua 脚本,保证执行的原子性。 
     * 
     * @param lockKey   锁 key 
     * @param requestId 请求标识 
     * @return 结果 
     * @since 0.0.1 
     */ 
    @Override 
    public boolean unlock(String lockKey, String requestId) { 
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; 
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); 
        return LockRedisConst.RELEASE_SUCCESS.equals(result); 
    } 
 
}

这里时最核心的部分。

别看简单几行代码,需要注意的点还是很多的。

加锁

加锁时附带 requestId,用来标识自己为锁的持有者。

SETNX 当 key 不存在时才进行加锁。

设置加锁的过期时间,避免因异常等原因未释放锁,导致锁的长时间占用。

解锁

使用 lua 脚本,保证操作的原子性。

为了证明为锁的持有者,传入 requestId。

测试验证

maven 引入

<dependency> 
    <groupId>com.github.houbb</groupId> 
    <artifactId>lock-core</artifactId> 
    <version>0.0.1</version> 
</dependency>

测试代码

Jedis jedis = new Jedis("127.0.0.1", 6379); 
IOperator operator = new JedisOperator(jedis); 
 
// 获取锁 
ILock lock = LockRedisBs.newInstance().operator(operator).lock(); 
 
try { 
    boolean lockResult = lock.tryLock(); 
    System.out.println(lockResult); 
    // 业务处理 
} catch (Exception e) { 
    e.printStackTrace(); 
} finally { 
    lock.unlock(); 
}

小结

到这里,一个简单版本的 redis 分布式锁就实现完成了。

当然还有很多可以改进的地方:

(1)比如引入递增的 sequence,避免分布式锁中的 GC 导致的问题

(2)对于更多 redis 服务端+客户端的支持

(3)对于注解式 redis 分布式锁的支持