分布式锁中基于Redis的实现需避坑:Jedis篇

一、redis 介绍

Redis 应该是目前最受欢迎的高性能的缓存数据库了,在五一期间看到一则 Redis 7.0 发布的消息后,回想起多年前学习黄健宏老师《Redis 从入门到精通》2.x 的月伴时光,不由得感慨 Reids 发展之迅速。搜集了一下 3.0 及之后各版本的知名特性,整理出来方便读者朋友们有个简单了解(感兴趣的朋友还需自行深入研究),情况大致如下:

1.1 特性介绍

为满足本篇目标所需,这里着重介绍以下几个关键特性:

Redis 的数据结构(来自网络)

Redis 的集群模式(来自网络)

Redis lua 脚本的工作机制(来自网络)

1.2 特性总结

Redis 的分布式锁正是基于以上特性来实现的,简单来说是:

  1. TTL 机制:用于支撑异常情况下的锁自动释放的能力
  2. 顺序变更:用于支撑获取锁和排队等待的能力
  3. 集群+主从模式:用于支撑锁服务的高可用

Redis 没有提供对分布式锁亲和的监听机制,需要客户端主动轮询感知数据变更。

二. 加锁解锁的流程描述

使用 Jedis 指令实现分布式锁的核心流程如下图所示:

  1. 准备客户端、key 和 value
  2. 若 key 不存在,指定过期时间成功写入 Key-Value 则抢锁成功,并定时推后 key 的过期时间
  3. 若 key 已存在,则采用重试策略间歇性抢锁。
  4. 解锁时,删除 key 并撤销推后 key 过期时间的逻辑

其中第 2 和第 4 是核心环节,有几个版本的演进很有趣味:

  1. 插入 key 和设置过期时间并非原子操作:setnx + expire 加锁和设置过期是两个分开的独立操作;若发生异常,导致设置过期操作未执行,则此锁就成了永恒锁,其他客户端就再也抢不到了
  2. 以原子性操作完成插入 key 和设置过期时间:使用 set 的扩展指令,如下:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
复制代码
if(jedis.set(key, lockValue, "NX", "EX", 100) == 1){ //加锁成功
  try {
      do work //执行业务
      //这里缺点什么?
  }catch(Exception e){
      //...
  }finally {
     jedis.del(key); //释放锁,这里可能误删其他client的锁key
  }
}
复制代码
  1. 引入 lockValue 的随机值校验,避免误释放其它客户端的锁,场景如下:

解决办法是:加锁时指定的 lockValue 为随机值,每次加锁时的值都是唯一的,释放锁时若 lockValue 与加锁时的值一致才可释放,否则什么都不做,逻辑如下:

if(jedis.set(key, randomLockValue, "NX", "EX", 100) == 1){ //加锁
   try {
       do something  //业务处理
   }catch(){
 }
 finally {
      //判断是不是当前线程加的锁,是才释放
      //但判断和释放锁两个操作不是原子性的
      if (randomLockValue.equals(jedis.get(key))) {
         jedis.del(key); //释放锁
      }
   }
}
复制代码

以上代码遗留的问题是判断 randomlockValue 和释放锁两个操作不是原子性的。

  1. 引入 lua 脚本,保障判断 randomlockValue 和删除 key 这两个操作的原子性,逻辑如下:
String script =
        "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                "   return redis.call('del',KEYS[1]) " +
                "else" +
                "   return 0 " +
                "end";
Object result = jedis.eval(script, Collections.singletonList(key),
Collections.singletonList(randomLockValue));
if("1".equals(result.toString())){
    return true;
}
复制代码

至此依然存在的一个问题是:若持锁后,业务逻辑执行耗时 超过了 key 的过期时间,则锁 Key 会被 Reids 主动删除。

  1. 引入 watchDog 定时推后 key 的过期时间,避免业务未执行完时,key 过期被 Redis 删除。
if(jedis.set(key, randomLockValue, "NX", "EX", 100) == 1){ //加锁成功
  try {
      do work //执行业务
      //watchDog定时延后Key的过期时间
  }catch(Exception e){
      //...
  }finally {
     String script =
              "if redis.call('get',KEYS[1]) == ARGV[1] then" +
                      "   return redis.call('del',KEYS[1]) " +
                      "else" +
                      "   return 0 " +
                      "end";
      try {
          Object result = jedis.eval(script, Collections.singletonList(key),
                                  Collections.singletonList(randomLockValue));
          if("1".equals(result.toString())){
              return true;
          }
          return false;
      }catch(Exception e){
      //...
    }
  }
}
复制代码

三. Jedis 分布式锁的能力

可能读者是单篇阅读,这里引入第一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:

基于上文对 Jedis 分布式锁的介绍,这里简单总结一下 Jedis 的能力矩阵,ZK 请看《分布式锁中-基于 Zookeeper 的实现》,etcd 请看《分布式锁中-基于 etcd 的实现很优雅》 ,表格中标题使用 Redis-简单锁,主要是跟 RedLock 做区分,这种简单锁使用 Jedis 、Lettuce、Redisson 都能实现,任何一把锁的信息只保存在一个 Redis master 实例中,而 RedLock 是 Redisson 提供的高阶分布式锁,它需要客户端同时跟多个 Redis master 实例协作才能完成,即一把锁的信息同时存在于多个 master 实例中。它的情况会在后续文章中补充(感兴趣的读者可以关注本号 【架构染色】 ,文章完成时会主动推送给你)

能力

ZK

etcd

Redis-简单锁

Redlock

MySql

互斥



安全

链接异常时,session 丢失自动释放锁

基于租约,超时自动释放锁

基于 TTL,超时自动释放锁



可用性

相对可用性还好



可重入

服务端非可重入,本地线程可重入

服务端非可重入,Resission本地线程可重入

服务端非可重入,本地线程可重入需自研



加解锁速度

速度不算快

速度快,GRPC 协议优势以及服务端能力的优势

速度快



阻塞非阻塞

客户端两种能力都提供

jetcd-core 中,阻塞非阻塞由 Future#get 支撑

Jedis非阻塞,# Redission提供阻塞能力



公平非公平

公平锁

公平锁

非公平锁,# Redission提供公平锁



可续期

天然支持

天然支持

Jedis需自研 watchDog,Redission自带



其他因素

技术栈偏老,性能不佳

多数公司不熟悉

容易受业务缓存操作干扰



四、Jedis 库实现分布式锁

Jedis 是 Redis 官方推出的用于通过 Java 连接 Redis 客户端的一个工具包,提供了 Redis 的各种命令支持。

4.1 pom 依赖


    redis.clients
    jedis
    4.3.0

复制代码

4.2 相关的 API 介绍

 SetParams params = SetParams.setParams().nx().ex(lockState.getLeaseTTL());
 String result = client.set(lockState.getLockKey(), lockState.getLockValue(), params);
复制代码
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = client.eval(script, 1, lockState.getLockKey(), lockState.getLockValue());
复制代码

4.3 分布式锁示例

package com.rock.dlock.jedis;

import com.rock.dlock.common.DtLockException;
import com.rock.dlock.common.KeepAliveAction;
import com.rock.dlock.common.KeepAliveTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.params.SetParams;

import java.net.SocketTimeoutException;
import java.util.concurrent.TimeUnit;

/**
 * @author zs
 * @date 2022/11/13 4:44 PM
 */
public class DemoJedisLock {
    private final static Logger log = LoggerFactory.getLogger(DemoJedisLock.class);
    private JedisPooled client;

    private LockState lockState;
    private KeepAliveTask keepAliveTask;

    private int sleepMillisecond;

    private final static String RESULT_OK = "OK";
    private static final Long UNLOCK_SUCCESS = 1L;

    class LockState {
        private String lockKey;
        private String lockValue;
        private String errorMsg;
        private int leaseTTL;
        private long leaseId;
        private boolean lockSuccess;

        public LockState(String lockKey, int leaseTTL) {
            this.lockKey = lockKey;
            this.leaseTTL = leaseTTL;
        }

        public LockState(String lockKey, String value, int leaseTTL) {
            this.lockKey = lockKey;
            this.lockValue = value;
            this.leaseTTL = leaseTTL;
        }

        public String getLockKey() {
            return lockKey;
        }

        public void setLockKey(String lockKey) {
            this.lockKey = lockKey;
        }

        public String getLockValue() {
            return lockValue;
        }

        public void setLockValue(String lockValue) {
            this.lockValue = lockValue;
        }

        public String getErrorMsg() {
            return errorMsg;
        }

        public void setErrorMsg(String errorMsg) {
            this.errorMsg = errorMsg;
        }

        public long getLeaseId() {
            return leaseId;
        }

        public void setLeaseId(long leaseId) {
            this.leaseId = leaseId;
        }

        public boolean isLockSuccess() {
            return lockSuccess;
        }

        public void setLockSuccess(boolean lockSuccess) {
            this.lockSuccess = lockSuccess;
        }

        public int getLeaseTTL() {
            return leaseTTL;
        }

        public void setLeaseTTL(int leaseTTL) {
            this.leaseTTL = leaseTTL;
        }
    }


    public DemoJedisLock(JedisPooled client, String key, String value, int ttlSeconds) {
        //1.准备客户端
        this.client = client;
        this.lockState = new LockState(key, value, ttlSeconds);
        this.sleepMillisecond = (ttlSeconds * 1000) / 3; //抢锁的重试间隔可由用户指定
    }


    public boolean tryLock(long waitTime, TimeUnit waitUnit) throws DtLockException {
        long totalMillisSeconds = waitUnit.toMillis(waitTime);
        long start = System.currentTimeMillis();
        //重试,直到成功或超过指定时间
        while (true) {
            // 抢锁
            try {
                SetParams params = SetParams.setParams().nx().ex(lockState.getLeaseTTL());
                String result = client.set(lockState.getLockKey(), lockState.getLockValue(), params);
                if (RESULT_OK.equals(result)) {
                    manualKeepAlive();
                    log.info("[jedis-lock] lock success 线程:{} 加锁成功,key:{} , value:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
                    lockState.setLockSuccess(true);
                    return true;
                } else {
                    if (System.currentTimeMillis() - start >= totalMillisSeconds) {
                        return false;
                    }
                    Thread.sleep(sleepMillisecond);
                }
            } catch (Exception e) {
                Throwable cause = e.getCause();
                if (cause instanceof SocketTimeoutException) {//忽略网络抖动等异常
                }
                log.error("[jedis-lock] lock failed:" + e);
                throw new DtLockException("[jedis-lock] lock failed:" + e.getMessage(), e);
            }

        }
    }

    //此实现中忽略,网络通信异常部分的处理,可参考tryLock
    public void unlock() throws DtLockException {
        try {
            // 首先停止续约
            if (keepAliveTask != null) {
                keepAliveTask.close();
            }
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = client.eval(script, 1, lockState.getLockKey(), lockState.getLockValue());

            if (UNLOCK_SUCCESS.equals(result)) {
                log.info("[jedis-lock] unlock success 线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
            } else {
                log.info("[jedis-lock] unlock del key failed ,线程 : {} 解锁成功,锁key : {} ,路径:{}", Thread.currentThread().getName(), lockState.getLockKey(), lockState.getLockValue());
            }
        } catch (Exception e) {
            log.error("[jedis-lock] unlock failed:" + e.getMessage(), e);
            throw new DtLockException("[jedis-lock] unlock failed:" + e.getMessage(), e);
        }
    }

    // 定时将Key的过期推迟
    private void manualKeepAlive() {
        final String t_key = lockState.getLockKey();
        final int t_ttl = lockState.getLeaseTTL();

        keepAliveTask = new KeepAliveTask(new KeepAliveAction() {
            @Override
            public void run() throws DtLockException {
                // 刷新值
                try {
                    client.expire(t_key, t_ttl);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, t_ttl);
        keepAliveTask.start();
    }
}
复制代码
package com.rock.dlock.common;

public class DtLockException extends RuntimeException{
    public DtLockException(String message) {
        super(message);
    }

    public DtLockException(String message, Throwable cause) {
        super(message, cause);
    }

    public static DtLockException clientException(){
        return new DtLockException("client is empty");
    }
}
复制代码

package com.rock.dlock.common;

public interface KeepAliveAction {
    void run() throws DtLockException;
}
复制代码
package com.rock.dlock.common;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * @author zs
 * @date 2022/11/7 4:20 PM
 */
public class KeepAliveTask extends Thread {
    private static final Logger LOGGER = LoggerFactory.getLogger(KeepAliveTask.class);
    public volatile boolean isRunning = true;
    /**
     * 过期时间,单位s
     */
    private long ttlSeconds;
    private KeepAliveAction action;
    public KeepAliveTask(KeepAliveAction action, long ttlSeconds) {
        this.ttlSeconds = ttlSeconds;
        this.action = action;
        this.setDaemon(true);
    }

    @Override
    public void run() {
        final long sleep = this.ttlSeconds * 1000 / 3; // 每隔三分之一过期时间,续租一次
        while (isRunning) {
            try {
                // 1、续租,刷新值
                action.run();
                LOGGER.debug("续租成功!");
                TimeUnit.MILLISECONDS.sleep(sleep);
            } catch (InterruptedException e) {
                close();
            } catch (DtLockException e) {
                close();
            }
        }
    }

    public void close() {
        isRunning = false;
        this.interrupt();
    }
}
复制代码

4.4 测试锁

import com.rock.dlock.jedis.DemoJedisLock;
import redis.clients.jedis.JedisPooled;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @author zs
 * @date 2022/11/13 4:51 PM
 */
public class TestJedisLock {
    public static void main(String[] args) {

        JedisPooled jedis = new JedisPooled("127.0.0.1", 6379);
        DemoJedisLock demoEtcdLock1 = new DemoJedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);
        DemoJedisLock demoEtcdLock2 = new DemoJedisLock(jedis, "rock", UUID.randomUUID().toString(), 10);

        boolean lock1 = demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
        if (lock1) {
            try {
                System.out.printf("do something");
            } finally {
                demoEtcdLock1.unlock();
            }
        }
        demoEtcdLock1.tryLock(20, TimeUnit.SECONDS);
        demoEtcdLock2.tryLock(20, TimeUnit.SECONDS);//等待锁,超时后放弃
    }
}
复制代码

五、使用 Jedis 的一些注意事项

通常分布式锁服务会和业务逻辑使用同一个Redis 集群,自然也使用同一个 Jedis 客户端;当业务逻辑侧对 Redis 的读写并发提高时,会给 Redis 集群和 Jedis 客户度带来压力;为应对一些异常情况,我们除了解功能层面的 API,还需要了解一下客户端的一些配置调优,主要是池化管理和网络通信两个方面

5.1 池化管理

在使用 Jedis 时可以配置 JedisPool 连接池,池化处理有许多好处,如:提高响应的速度、降低资源的消耗、方便管理和维护;JedisPool 配置参数大部分是由 JedisPoolConfig 的对应项来赋值的,在生产中我们需要关注它的配置并合理的赋值,如此能够提升 Redis 的服务性能,降低资源开销。下边是对一些重要参数的说明、默认及设置建议:

参数

说明

默认值

建议

maxTotal

资源池中的最大连接数

8


maxIdle

资源池允许的最大空闲连接数

8


minIdle

资源池确保的最少空闲连接数

0


blockWhenExhausted

当资源池用尽后,调用者是否要等待。只有当值为 true 时,下面的maxWaitMillis才会生效。

true

建议使用默认值。

maxWaitMillis

当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)。

-1(表示永不超时)

不建议使用默认值。

testOnBorrow

向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。

false

业务量很大时候建议设置为 false,减少一次 ping 的开销。

testOnReturn

向资源池归还连接时是否做连接有效性检测(ping)。检测到无效连接将会被移除。

false

业务量很大时候建议设置为 false,减少一次 ping 的开销。

jmxEnabled

是否开启 JMX 监控

true

建议开启,请注意应用本身也需要开启。

空闲 Jedis 对象的回收检测由以下四个参数组合完成,testWhileIdle是该功能的开关。

名称

说明

默认值

建议

testWhileIdle

是否开启空闲资源检测。

false

true

timeBetweenEvictionRunsMillis

空闲资源的检测周期(单位为毫秒)

-1(不检测)

建议设置,周期自行选择,也可以默认也可以使用下方JedisPoolConfig 中的配置。

minEvictableIdleTimeMillis

资源池中资源的最小空闲时间(单位为毫秒),达到此值后空闲资源将被移除。

180000(即 30 分钟)

可根据自身业务决定,一般默认值即可,也可以考虑使用下方JeidsPoolConfig中的配置。

numTestsPerEvictionRun

做空闲资源检测时,每次检测资源的个数。

3

可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。

通过源码可以发现这些配置是 GenericObjectPoolConfig 对象的属性,这个类实际上是 rg.apache.commons.pool2.impl apache 提供的,也就是说 jedis 的连接池是依托于 apache 提供的对象池来,这个对象池的声明周期如下图,感兴趣的可以看下:

5.2 网络调优

Rsdis 节点故障或者网络抖动时,这两个值如果不合理可能会导致很严重的问题,比如 timeout 设置为 1000,maxRedirect 为 2,一旦出现 redis 连接问题,将会导致请求阻塞 3s 左右。而这个 3 秒的阻塞在可能导致常规业务流量下的线程池耗尽,需根据业务场景调整。

原文链接:https://juejin.cn/post/7166131629348356104

展开阅读全文

页面更新:2024-04-23

标签:分布式   集群   线程   加锁   客户端   能力   代码   业务   时间   资源

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top