博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
redis分布式锁的几种实现方式,以及Redisson的配置和使用
阅读量:7225 次
发布时间:2019-06-29

本文共 5413 字,大约阅读时间需要 18 分钟。

最近在开发中涉及到了多个客户端的对redis的某个key同时进行增删的问题。这里就会涉及一个问题:锁

先举例在分布式系统中不加锁会出现问题:

  redis中存放了某个用户的账户余额 ,例如100 (用户id:余额)

  A端需要对用户扣费-1,需要两步:

    A1.将该用户的目前余额取出来(100)

    A2.将余额扣除一部分(99)后再插入到redis中

  B端需要对用户充值+10,需要两步:

    B1.将该用户的目前余额取出来(99)

    B2.将余额添加充值额度(109)后再插入到redis中

  我们的期望执行顺序是A1、A2、B1、B2  结果就会是109

  但是如果不加锁,就会出现A1、B1、A2、B2(110)或者其他各种随机情况,这样就会造成数据错误。

Redis加锁的几种实现方式

  方式一,自己造轮子

    之前参考的很多博客,关于redis加锁都是先setNX()获取锁,然后再setExpire()设置锁的有效时间。

    然而这样的话获取锁的操作就不是原子性的了,如果setNX后系统宕机,就会造成锁死,系统阻塞。

    根据官方的推荐(https://redis.io/topics/distlock),最好使用set命令:SET key value [EX seconds] [PX milliseconds] [NX|XX]       

    EX PX设置有效时间    NX属性的作用就是如果key存在就返回失败,否则插入数据。

    需要注意的是:

      在Redis 2.6.12之前,set只能返回OK,所以无法判断操作是否成功,所以也就不适用。

      如果使用的是spring-boot-starter-data-redis依赖,那么在2.x版本之前的接口也不支持上述的set操作

    java代码:

//获取锁        //锁的键值需要具有标志性。        //例如,现在有两个系统需要对key=user_id,value=user_balance进行操作,这时就可以设计这个键的锁为user_id+"_key"        String user_id="1";        String key=user_id+"_key";        //值设置为一个随机数(下面讲原因)        String random_value=UUID.randomUUID().toString();        redisTemplate.execute((RedisCallback
) (RedisConnection connection)->{ //只有2.0以上的版本才支持set返回插入结果Boolean //此命令的意思是只有key不存在,才插入值,并且设置有效时间为10s connection.set(key.getBytes(), random_value.getBytes(), Expiration.seconds(10), SetOption.SET_IF_ABSENT); //本示例由于依赖版本低于2.0,所以无法接受set设置结果 Boolean result=true; return result; }); //进行更新操作... //释放锁 //为什么释放之前要比较一下? //这是为了防止删除掉别人的锁,例如此场景中:如果我们的中间操作超过了10s那么锁会自动释放,这时别人会再获取锁。 //如果我们执行完中间就直接删除锁的话,就会把别人的锁删除 if(redisTemplate.opsForValue().get(key)==random_value) { redisTemplate.delete(key); }

可以发现,如果自己来实现的话,受限很多。并且这还是最基本的操作,包括出错重试等功能都没有。

所以我们要学习redis推荐的reids工具redisson

  方式二:集成redissonhttps://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95

  一.添加依赖

org.redisson
redisson
3.6.1

  二.resources文件夹添加配置文件redisson.yml

singleServerConfig:  #连接空闲超时,单位:毫秒  idleConnectionTimeout: 10000  pingTimeout: 1000  #连接超时,单位:毫秒  connectTimeout: 10000  #命令等待超时,单位:毫秒  timeout: 3000  #命令失败重试次数  retryAttempts: 3  #命令重试发送时间间隔,单位:毫秒  retryInterval: 1500  #重新连接时间间隔,单位:毫秒  reconnectionTimeout: 3000  #执行失败最大次数  failedAttempts: 3  #单个连接最大订阅数量  subscriptionsPerConnection: 5  #客户端名称  clientName: null  #地址  address: "redis://192.168.1.16:6379"  #数据库编号  database: 0  #密码  password: xiaokong  #发布和订阅连接的最小空闲连接数  subscriptionConnectionMinimumIdleSize: 1   #发布和订阅连接池大小  subscriptionConnectionPoolSize: 50  #最小空闲连接数  connectionMinimumIdleSize: 32  #连接池大小  connectionPoolSize: 64  #是否启用DNS监测  dnsMonitoring: false  #DNS监测时间间隔,单位:毫秒  dnsMonitoringInterval: 5000threads: 0nettyThreads: 0codec: !
{}transportMode : "NIO"

  三.Application中设置RedissonClient

import org.mybatis.spring.annotation.MapperScan;import org.redisson.Redisson;import org.redisson.api.RedissonClient;import org.redisson.config.Config;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.cloud.client.discovery.EnableDiscoveryClient;import org.springframework.context.annotation.Bean;import org.springframework.core.io.ClassPathResource;import org.springframework.transaction.annotation.EnableTransactionManagement;@SpringBootApplication@EnableTransactionManagement@MapperScan("com.xxx.mapper")public class Application {    public static void main(String [] args) {        SpringApplication.run(Application.class, args);    }    @Bean(destroyMethod="shutdown")    public RedissonClient redisson() throws IOException {        RedissonClient redisson = Redisson.create(                Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()));        return redisson;    }}

 

  四.在代码中使用

@Autowired    private RedissonClient redisson;    @Test     public void redisson() {        String user_id="1";        String key=user_id+"_key";        //获取锁        RLock lock = redisson.getLock(key);        lock.lock();            //执行具体逻辑...                RBucket bucket = redisson.getBucket("a");        bucket.set("bb");                lock.unlock();    }

需要注意的是redisson的使用和redisTemplate有比较大的区别,这里简单介绍一下几个特性:(刚用时迷了很久,希望大家能少走些弯路)

1.redisson中不需要set指令,举个例子:

RBucket bucket = redisson.getBucket("a");bucket.set("bb");

在这两条语句中,我们只获取了key="a"的bucket类型对象(里面可以装一个任意对象)。然后修改bucket里面一个值,其实这时["a","bb"]已经被存入redis了

2.所有的值都是结构体

和上例的RBucket结构体一样,redisson提供了十几种结构体(https://github.com/redisson/redisson/wiki/7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88)供我们使用,当取值时,redisson也会自动将值转换成对应的结构体。所以如果使用redisson取redisTemplate放入的值,就要小心报错

方式三.基于redlock的算法讨论

这种我还没有具体实现过,为什么会出现这种算法,主要是应对redis服务器宕机的问题。当redis宕机时,即使有主从,但是依然会有一个同步间隔。这样就会造成数据流失。

当然,更为严重的是,在分布式情况下,丢失的是锁,我们知道一般用锁的数据都是比较重要的。

一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。

redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错。

当然具体是很复杂,想研究的可以看看(https://redis.io/topics/distlock)

方式四.使用zookeeper+redis来管理锁

就像之前讨论的,方式2只能保证客户端的正确,却无法保证服务端的宕机数据丢失。方式三的数据完整性很高,但是管理起来很复杂。这时就有了一个折中的做法:

将锁存放在zookeeper中,由于zookeeper与redis的场景不同,所以zookeeper的算法对数据的完整性要求很高。在分布式的zookeeper中,数据是很难丢失的。

这样,我们就可以把锁放到zookeeper中,来保证锁的完整性。

好吧,这个我也没有实验过(羞耻),不过网上又很多这方面的博客,以后用到再说吧~~~一般项目用方式二就可以啦,

 

转载于:https://www.cnblogs.com/chenkeyu/p/8514250.html

你可能感兴趣的文章
ES6简单总结(搭配简单的讲解和小案例)
查看>>
text-decoration与color属性
查看>>
如何使用Mybatis第三方插件--PageHelper实现分页操作
查看>>
PyCharm搭建GO开发环境(GO语言学习第1课)
查看>>
Android交互
查看>>
提醒我喝水chrome插件开发指南
查看>>
列表数据转树形数据
查看>>
Java新版本的开发已正式进入轨道,版本号18.3
查看>>
从零开始的webpack生活-0x009:FilesLoader装载文件
查看>>
在electron中实现跨域请求,无需更改服务器端设置
查看>>
gitlab-ci配置详解(一)
查看>>
听说你叫Java(二)–Servlet请求
查看>>
案例分享〡三拾众筹持续交付开发流程支撑创新业务
查看>>
FreeWheel业务系统微服务化过程经验分享
查看>>
移动互联网下半场,iOS开发者如何“高薪”成长?
查看>>
Atlassian是怎样进行持续交付的?且听 Steve Smith一一道来
查看>>
Web Storage相关
查看>>
[PHP内核探索]PHP中的哈希表
查看>>
Apache-drill Architechture
查看>>
WordPress 5.2 Beta 3 发布,要求 PHP 5.6.20 以上版本
查看>>