【软构件】笔记 springboot+redis+rabbitMQ+分布式锁+redisson
笔记1-springboot框架搭建Springboot 微服务OOP:面向对象分析:设计过程中的OOPUMLJava->SOA->MiddlewareSOA:(Service-Oriented Architecture)JSP/Servlet:工作原理JSP本质上是一个简单的Servlet。.jsp->Java类(Servlet)->.class(service方法)Ht
笔记1-springboot框架搭建
搭建一个SpringBoot框架
- new project(maven)
- new module(maven):api(纯JavaBean模块,没有任何服务).jar
- 添加依赖lombok和jackson-annotations
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.28</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.15.2</version> </dependency>
- 在这里定义实体类
- new module():service(以服务形式提供) SpringBoot(multi-instance)
- 选择依赖Spring web
- 添加依赖api
<dependency> <groupId>org.example</groupId> <artifactId>api</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
- 在这里编写controller
- application.yml中添加端口号,任意定义(如6001)
- new module:server(以服务形式提供) SpringBoot
- 选择依赖Spring web和Thymeleaf
- 添加依赖api
- 在这里编写controller
- application.yml中添加端口号,80
- coding: Controller…
串:api(JavaBean),service(服务,JavaBeanObject),server(服务,从service获取并转化为对象) - deploy & test(service, server)
mybatis->service
void setCode(String code)
Message setCode(String code)
{
...
return this;
}
实例
api模块定义实体类
@Data
@NoArgsConstructor
//@Accessors(chain=true) //链式
public class Message
{
private String code, msg;
}
service中编写controller
非链式
@RestController
public class MessageController
{
@RequestMapping("/test/msg")
public Message helloMsg()
{
Message msg = new Message();
msg.setCode("001");
msg.setMsg("Hello SpringBoot.");
return msg;
}
}
链式
@RestController
public class MessageController
{
@RequestMapping("/test/msg")
public Message helloMsg()
{
return new Message().setCode("001").setMsg("Hello SpringBoot.");
}
}
访问localhost:6001/test/msg
可以访问获取到信息了。
在server中编写config
@Configuration
public class Config {
@Bean
public RestTemplate restTemplate()
{
return new RestTemplate();
}
}
在server中编写redis的模板config
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig
{
@Autowired(required = false)
private RedisConnectionFactory redisConnectionFactory;//Redis连接工厂(相当于redis-cli的窗口)
@Bean
public RedisTemplate<String, Object> redisTemplate()//相当于redis-cli的命令行
{
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
// redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//JSON泛型
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
@Bean
public StringRedisTemplate stringRedisTemplate()
{
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
return stringRedisTemplate;
}
}
JdkSerializationRedisSerializer和GenericJackson2JsonRedisSerializer的区别是:
上下两种分别是JdkSerializationRedisSerializer比GenericJackson2JsonRedisSerializer的效果。
JdkSerializationRedisSerializer比GenericJackson2JsonRedisSerializer快,因为JdkSerializationRedisSerializer在传输时对数据进行了序列化。
如果用于java语言,那么两种都能用。
但是如果用于c++或者其他,无法接受序列化以后的内容。
controller
package com.example.server.controller;
import com.example.service.util.R;
import domain.Course;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
@RestController
@RequestMapping
public class MyCourseController
{
private final String URL_PREFIX = "http://localhost:8081";
@Resource
private RestTemplate restTemplate;
@GetMapping("/my/course")
public R testCourse()
{
return restTemplate.getForObject(URL_PREFIX + "/course", R.class);
}
@Resource
private RedisTemplate redisTemplate;
@GetMapping("/test")
public Object myRedis()
{
ValueOperations<String, Object> opsv = redisTemplate.opsForValue();
opsv.set("mykey", new Course(1, "name", null, null, 20.0, "introduction", "1"));
return opsv.get("mykey");
}
}
现在访问localhost/my/msg
就相当于访问localhost:6001/test/msg
了。
相当于多了一层隔离,但是速度会比较慢。
好处在springboot中难以体现,springcloud里面就可以感受到。
当有多个service的时候,可以通过server去定端口。不能让服务端去自主选择端口。
如果多个独立service没有交互,但是其他地方需要同时用到。可以在server里面交互,service可以保持独立。
笔记2-springboot整合redis
redis
Sprintboot整合redis:
- 添加依赖:redis客户端(不推荐jedis,是独立框架,第三方包)
- application.properties(配置redis的端口)
- 配置(redisTemplate)
- 编写代码
泛型:编译现象
java中redis泛型,如“<String,Integer>”只在java中受到限制,在其他地方(如redis_cli.exe)还是可以插入其他类型的值。
海量数据中的查找?
数据库中的方法:
索引:index进行检索条件匹配。(坏处:增加数据库开销,要有个地方放置索引。每次存入数据的时候,要更新索引。)
redis
equals:逻辑上相等
hashCode:散列码
Bloom Filter
mybatis整合
service:
model:bean
mapper:xml
dao:dao
server(DAO:Data Transfer Object):
数据库:
?userUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&userSSL=false
附录
项目日常添加依赖
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
redisTemplate
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.concurrent.TimeUnit;
@Configuration
public class RedisConfig
{
@Autowired(required = false)
private RedisConnectionFactory redisConnectionFactory;//Redis连接工厂(相当于redis-cli的窗口)
@Bean
public RedisTemplate<String, Object> redisTemplate()//相当于redis-cli的命令行
{
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
// redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//JSON泛型
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
@Bean
public StringRedisTemplate stringRedisTemplate()
{
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
return stringRedisTemplate;
}
}
笔记3-redis缓存穿透
上节课知识回顾
server好处:
- SpringCloud
- 网关隔离(内网,保护机制)
- server中可能业务重组实现
Redis缓存
- springboot-starter-data-redis(Config/@Bean是对redis-cli.exe的封装)
- jedis第三方
redis不建议做数据库:
- 用的是内存,较小。
- 没有备份找回机制,不稳定。
本课堂:redis
redis缓存穿透问题:
- 查询id
- redis未命中id -> 到数据库中查询
- 如果数据库中存在,将【id,查询结果】存入redis,输出结果
- 如果数据库中不存在,输出NULL
- redis命中id -> 输出redis结果
ValueOperations<String, Object> vop = redisTemplate.opsForValue();
String key = "red_" + id;
RedRecord red = (RedRecord) vop.get(key);//从redis中查找id
if (red == null)
{
red = redRecordDao.selectByPrimaryKey(id);//从数据库查找id
}
return red;//返回查询数据
导致了可能同一个不存在id一直未命中,所以一直在redis和数据库查询的问题。
解决方案:
- 查询id
- redis未命中id -> 到数据库中查询
- 如果数据库中存在,将【id,查询结果】存入redis,输出结果
- 如果数据库中不存在,将【id,空值(空值即空字符串,不能是NULL)】存入redis,输出NULL
- redis命中id -> 输出redis结果
注意:上面的空值与NULL有区别。在redis中查询,
返回NULL:
是代表:该id不存在于缓存中。
后续:需要到数据库查询。
返回空值(可以是一个空字符串):
是代表该id不存在于缓存中也不存在与数据库中,但上次已经查询过了,所以在缓存中直接标记该数据为空。
后续:直接输出NULL
ValueOperations vop = redisTemplate.opsForValue();
String key = "red_" + id;
String value = (String) vop.get(key); //从redis中查找id
//redis命中
if(value != null){
//redis查询结果是空值
if(value.length() == 0) {
return "not found!!";
}else return objectMapper.readValue(value, RedRecord.class);
}
//redis不命中
RedRecord red = redRecordDao.selectByPrimaryKey(id);
//数据库查询结果为空
if(red == null)
vop.set(key, "", 30, TimeUnit.SECONDS); //记录不存在,设置超时自动删除
else
vop.set(key, objectMapper.writeValueAsString(red));
return red;
这样当第一次未命中,缓存中就已经有【id,空值】的记录。后面每次查询,redis中已经存在该记录,不用查询数据库。
讨论
这个解决方法效率如何:
这个方法只能解决每次都查询相同的不存在的id。
如果是每次都查询不同的不存在的id,则不适用。依然存在冗余+穿透。
布隆过滤器
前提:
假设有2个Hash映射函数:
h1(x),h2(x)
现在要保存这几个值:4,5,6
4:h1(4)=100,h2(4)=105
5:h1(5)=105,h2(5)=106
6:h1(6)=101,h2(6)=107
1 | 2 | …… | 100 | 101 | 102 | 103 | 104 | 105 | 106 | 107 |
---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | …… | 1 | 1 | 0 | 0 | 0 | 1 | 1 | 1 |
查找:
- 查找7
假设h1(7)=105,h2(7)=108
find(105)=1,find(108)!=0,有任意一个不等于1,则不存在,所以7不存在。 - 查找8
假设h1(8)=105,h2(8)=100
find(105)=1,find(100)!=1,两个都等于1,则为不确定状态,所以7可能存在也可能不存在。
讨论:
这个方法优点:确定了一些不存在的值,减少穿透。
缺点:有一些误识别率和删除困难。
缓存问题汇总:
缓存穿透
缓存系统中无法命中所需数据,导致每次查询都要直接访问后端数据源。
**解决方法:**空值缓存,布隆过滤器
缓存击穿(即:缓存无数据,数据库有数据,key比较集中)
如高并发的情况下,热点数据缓存过期,这时候会导致大量请求读不到缓存同时读数据库,导致数据库负载过大。(比如:缓存是半小时清理一次,清理完刚好100万条访问,此时就要有100万条访问数据库)
解决方法:
- 分布式锁,每次查询只有一个加载到缓存中(就是把清理掉的热点缓存,又一个一个从数据库添加到缓存)。但是降低了并发。
- 设定key不要同一时刻大量过期
缓存雪崩(即:缓存无数据,数据库有数据,key比较分散)
如在高并发的情况下,缓存同一时刻失效(如缓存挂了,或者设置了相同过期时间),所有请求会读数据库,容易导致数据库负载瞬间上升,乃至崩掉。如果重启数据库,立马又会被新的请求压崩。
**解决方法:**与上一条同
缓存预热
缓存预热是指系统上线后,提前将相关的缓存数据加载到缓存系统中,避免刚上线使用户有太多请求打到数据库上去,然后再去将数据缓存的问题。缓存降级
缓存降级是指缓存失效或缓存服务器挂掉的情况下,不去访问数据库,直接返回默认数据或访问服务的内存数据。降级一般是有损的操作,所以尽量减少降级对于业务的影响程度。
下节课预习
抢红包:
(1)逻辑:添加红包(随机分配)
(2)业务:发红包,抢红包
(3)并发测试:jemter
(4)可能存在的问题:同一个用户抢到多个红包?解决方法?
笔记4-redis应用抢红包
抢红包:
(1)逻辑:添加红包(随机分配)
(2)业务:发红包,抢红包
(3)并发测试:jemter
(4)可能存在的问题:同一个用户抢到多个红包?解决方法?
非分布式
示例1:
取记录,判断,减内存中的数值,保存到数据库
问题:
单线程没问题。多线程会出问题。(即数据库中从1000变成0了,但是实际上不止发了10次红包)
示例2:
在mapper中,不再是直接赋值。
而是通过数据库本身的原子操作:
update xxx set amount = amount - #{money}
where id = #{id}
结果:超支。(即实际上不止发了10次红包,数据库中变成负数了)
示例3:
不做判断,直接减。由返回值进行判断
update xxx set amount = amount - #{money}
where id = #{id} and amount >=100
结果:利用了sql的原子性,但是当请求过多,会造成数据库的堵塞。
就是虽然你不合理的请求,扣完经过判断以后,不扣了。
这一步在逻辑上是没问题,但是这个请求还是经过了sql。
示例4:
void synchronized 方法(){
……
}
坏处:
- 并发度下降,对同一个对象来说是串行执行
=> synchronized中的代码要尽量段(代码量少,且执行时间尽量短) - 你能确定只有一个service对象在运行吗?如果又new了一个service,那么逻辑仍然会出错。
- 集群环境中service肯定是存在多个,这时候如何互斥?
使用分布式锁(对业务加锁)
前提:
每个用户对于每个红包只能抢1次。
每个红包(比如100),会被提前分成多个小红包,多个小红包加起来是100。
输入用户id
和红包id
,进行下面的红包分配。
过程:
- 判断该用户是否已经抢过红包:判读"用户id+红包id"是否存在于redis中。
- 如果抢过,则return -1
- 没抢过,则判断是否有剩余红包:判断"红包id:total"的值是否>0
- 如果<=0,则return -1
- 如果有剩余红包,则redisTemplate.opsForList().leftPop(红包id);从红包队列中取出一个小红包
- 判断该小红包是有效的,如果小红包==null 或 小红包 ==“”,则return -1
- 如果有效,则更新数据(更新"红包id:total",更新"用户id+红包id"),返回小红包的id
- 记录到数据库(用户+红包金额)
错误代码
public int rob(String redId, Integer userId){ //TODO: 加synchronized也无法实现!!
String robId = redId + ":" + userId; //记录是否已经抢到红包
//1. 查询是否已经抢到红包,若已经抢到,则返回
ValueOperations valueOperations = redisTemplate.opsForValue();
Object obj = valueOperations.get(robId);
if(obj != null && "".equals("" + obj) == false){
return Integer.valueOf("" + obj);
}
//2. 若剩余红包列表为空,则返回失败。否则,取一个红包,并返回
try{ //cheat!!!
Thread.sleep(50);
}catch (Exception e){}
ValueOperations strOperations = stringRedisTemplate.opsForValue();
int total = Integer.valueOf("" + strOperations.get(redId + ":total"));
if(total <= 0) return -1; //fail
//从队列取一个红包
ListOperations listOperations = redisTemplate.opsForList();
Object rr = listOperations.leftPop(redId); //原子操作
if (rr == null || "".equals("" + rr)) return -1;
//strOperations.set(redId + ":total", "" + (total - 1)); //TODO: 可能存在不一致性!!
strOperations.decrement(redId + ":total"); //原子性
valueOperations.set(robId, rr); //redis recording
logger.info("success, user:" + userId + ",rob:" + rr);
//TODO: 记录到数据库
return Integer.valueOf("" + rr);
}
正确代码
public /*synchronized*/ int rob_lock(String redId, Integer userId){
String robId = redId + ":" + userId; //记录是否已经抢到红包
//1. 查询是否已经抢到红包,若已经抢到,则返回
ValueOperations valueOperations = redisTemplate.opsForValue();
Object obj = valueOperations.get(robId);
if(obj != null && "".equals("" + obj) == false){
return Integer.valueOf("" + obj);
}
//2. 若剩余红包列表为空,则返回失败。否则,取一个红包,并返回
try{ //cheat!!!
Thread.sleep(100);
}catch (Exception e){}
ValueOperations strOperations = stringRedisTemplate.opsForValue();
int total = Integer.valueOf("" + strOperations.get(redId + ":total"));
if(total <= 0) return -1; //fail
//相当于在redis中synchronized(robId + "-lock")
Boolean lock = valueOperations.setIfAbsent(robId + "-lock", "123"); //分布式锁(value不重要)
if(lock) {
ListOperations listOperations = redisTemplate.opsForList();
Object rr = listOperations.leftPop(redId); //原子操作
if (rr == null || "".equals("" + rr)) return -1;
//update total
strOperations.decrement(redId + ":total"); //redis decr
//strOperations.set(redId + ":total", total - 1);
valueOperations.set(robId, rr); //redis recording
logger.info("success, user:" + userId + ",rob:" + rr);
//3. TODO: 写入数据库
//问题:能不能删除-lock这个锁?不能删除(并发环境下,不可预知其他线程的执行位置!!)
//可以使用超时,比如:5分钟
return Integer.valueOf("" + rr);
}else return -1;
}
重点:
- redis中红包剩余数量的原子性
更新"红包id:total"的时候不能使用strOperations.set(redId+":total",xxx)
,而是要使用strOperations.decrement(redId+":total")
。保证原子性。 - redis中取出红包的原子性
要将6-8步骤的放到锁里。
Boolean lock = value0perations.setIfAbsent(robId + "-lock", "123"); //123是随便写的,这句话只是为了让key存在,即robId+"-lock"存在,即"红包id+用户id-lock"存在。
if(lock){
从队列中取出小红包,更新redis数据,记录写入数据库。
}
解析:
为什么会出现这个问题?
设想问题出现的过程,即一个用户能成功抢到红包的过程:
1该用户没抢过红包 -> 2有剩余小红包 -> 3取出小红包 -> 4小红包数量-1 -> 5将用户状态设置为已经抢过红包 -> 6用户抢红包数据写入数据库
同一个用户有多个线程进入1,进入2,进入3,进入4。
直到第一个线程到5的时候,此时,1才会对该用户的线程进行关闭。
但是这个时候已经有很多个线程在2,3,4了,1虽然关闭了大门,但是这些线程已经进门了。所以拦不住,他们会继续经过5.
那么是如何解决的呢?
在5之前又设置了一道门。"用户id+红包id"的门。
笔记5-RabbiMQ
RabbiMQ
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
设置配置参数
# RabbitMQ
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# Queue definitions
mq.env=local
mq.basic.info.queue.name=${mq.env}.middleware.mq.basic.info.queue
mq.basic.info.exchange.name=${mq.env}.middleware.mq.basic.info.exchange
mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key
注意:spring.rabbitmq.port=5672
不是15672
RabbitMQConfig
package com.jihuiting.service.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig
{
private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired(required = false)
private CachingConnectionFactory cachingConnectionFactory; //MQ连接
@Autowired(required = false)
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; //监听器配置
/**
* 单一消费者
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer()
{ //客户端连接监听器(Connection, Channel....)
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter()); //TODO: JSON
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
return factory;
}
/**
* RabbitMQ发送消息的操作组件实例
*/
@Bean
public RabbitTemplate rabbitTemplate()
{
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
cachingConnectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("消息丢失:{}", returnedMessage));
return rabbitTemplate;
}
}
实现定义队列和交换器(绑定键)
package com.jihuiting.service.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置类
*
* @author pan_junbiao
**/
@Configuration
public class TopicRabbitMqConfig
{
public static final String TOPIC_QUEUE_NAME = "testQueue"; //队列名称A
public static final String TOPIC_EXCHANGE_NAME = "testExchange"; //交换器名称A
public static final String TOPIC_ROUTING_KEY = "topic.routingKey.testQueue_testExchange"; //绑定键
/**
* 队列
*/
@Bean
public Queue topicQueueA()
{
/**
* 创建队列,参数说明:
* String name:队列名称。
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
* boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
* 当没有生产者或者消费者使用此队列,该队列会自动删除。
* Map<String, Object> arguments:设置队列的其他一些参数。
*/
return new Queue(TOPIC_QUEUE_NAME, true);
}
/**
* 交换器
*/
@Bean
public DirectExchange basicExchange()
{
/**
* 创建交换器,参数说明:
* String name:交换器名称
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
*/
return new DirectExchange(TOPIC_EXCHANGE_NAME, true, false);
}
@Bean
Binding bindingDirect(DirectExchange directExchange,Queue directQueue)
{
return BindingBuilder.bind(directQueue).to(directExchange).with(TOPIC_ROUTING_KEY);
}
}
创建发送者
package com.jihuiting.service.controller;
import com.jihuiting.service.service.TopicRabbitMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/topic")
public class TopicSender
{
@Autowired
private TopicRabbitMqService topicRabbitMqService;
@GetMapping("/send/{queueName}/{exchangeName}/{context}")
public void sendTopicMessage(@PathVariable("queueName") String queueName,
@PathVariable("exchangeName") String exchangeName,
@PathVariable("context") String context)
{
System.out.println("-------------------------------------------- ");
System.out.println("sendTopicMessage方法:");
System.out.println("queueName——"+queueName);
System.out.println("exchangeName——"+exchangeName);
System.out.println("context——"+context);
System.out.println("-------------------------------------------- ");
//http://localhost:8080/topic/send/testQueue/testExchange/theTestContext
this.topicRabbitMqService.SendMessage(exchangeName,queueName,context);
}
}
创建接收者
package com.jihuiting.service.controller;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicReceiver
{
@RabbitListener(queues = "testQueue")
public void receiveMessage(String message) {
System.out.println("【testQueue】接收到消息:" + message);
}
}
下面的写法也可以:
package com.jihuiting.service.controller;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 接收者
* @author pan_junbiao
**/
@Component
@RabbitListener(queues = "testQueue")
public class TopicReceiverA
{
@RabbitHandler
public void process(String msg)
{
System.out.println("【testQueue】接收到消息:" + msg);
}
}
创建绑定关系
@RestController
@RequestMapping("/topic")
public class TopicSender
{
@Autowired
private TopicRabbitMqService topicRabbitMqService;
@GetMapping("/bind/{queueName}/{exchangeName}")
public void bindingExchange(@PathVariable("queueName") String queueName, @PathVariable("exchangeName") String exchangeName)
{
System.out.println("-------------------------------------------- ");
System.out.println("bind方法:");
System.out.println("queueName——"+queueName);
System.out.println("exchangeName——"+exchangeName);
System.out.println("-------------------------------------------- ");
//http://localhost:8080/topic/bind/testQueue/testExchange
this.topicRabbitMqService.bindingExchange(queueName,exchangeName);
}
}
@Service
public class TopicRabbitMqService
{
@Autowired
private RabbitTemplate rabbitTemplate;
public String getRoutingKey(String queueName, String exchangeName)
{
return "topic.routingKey."+queueName + "_" + exchangeName;
}
public Binding bindingExchange(String queueName, String exchangeName)
{
System.out.println("===============");
System.out.println("routingKey = "+getRoutingKey(queueName, exchangeName));
System.out.println("===============");
Queue queue = CreateTopicQueue(queueName);
TopicExchange topicExchange = CreateExchange(exchangeName);
return BindingBuilder.bind(queue).to(topicExchange).with(getRoutingKey(queueName, exchangeName));
}
}
报错
- java.net.SocketException: Socket closed
排除1:配置文件问题(账号,密码,端口)
排除2:试图创建新用户(无效)
排除3:视图修改spring.rabbitmq.virtual-host=/
,在http://localhost:15672/
也修改了(无效)
解决4:
建立接收者的时候
@Component
public class TopicReceiver
{
@RabbitListener(queues = "testQueue")
public void receiveMessage(String message) {
System.out.println("[testQueue]接收到消息:" + message);
}
}
没有注册bean,在TopicRabbitMqConfig中
public static final String TOPIC_QUEUE_NAME_A = "testQueue"; //队列名称A
@Bean
public Queue topicQueueA()
{
/**
* 创建队列,参数说明:
* String name:队列名称。
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
* boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
* 当没有生产者或者消费者使用此队列,该队列会自动删除。
* Map<String, Object> arguments:设置队列的其他一些参数。
*/
return new Queue(TOPIC_QUEUE_NAME_A, true);
}
笔记6-分布式锁
回顾RabbitMQ
- 除了RabbitMQ,还有ActiveMQ,RocketMQ,KafkaRedisson
- 目的:异步消息(比如淘宝,百万级访问,你访问每个产品都需要记录日志。会很卡,此时使用RabbitMQ进行异步访问);解耦。
- 死信队列(DLQ),延迟队列
- 有两个地方可以设置队列的延迟时长,分别是config的@Bean中和发布消息者中,如果两个地方设置了时长不一样,则生效的是相对较短的时长。
- 如果消息发布者发送了两个信息,发送了A等待时长10秒,然后发送了B等待时长5秒,那么最终,A和B是在第10秒的时候被处理的。因为队列是不会扫描的,即使B比A快,但只有A处理后,才会轮到B。所以适用于同时长的延迟。
- 不等长延迟:使用Redisson
今日知识:锁与分布式锁
前言
有些时候,并行程序中也必须串行执行,否则可能出现数据不一致性:
- 商品下单(多个用户同时购买,则可能出现库存超卖)
- 多个用户同时操作同一个银行账号(可能导致数据丢失)
- 单机锁:JDK提供的synchronized,Lock,ReentrantLock,ReentrantReadWriteLock(基本原理:同一个临界区的互斥锁)
- 分布式环境下,“同一临界区”很难实现。(synchronized虽然很强势很好用,但是如果运行了两个service,就破了)。
- 所以分布式锁应运而生。(分布式锁不是一个中间件,而是一种机制或实现方式)
准则
- 互斥性
- 避免死锁
- 高可用
- 可重入
- 公平锁(可选)
数据库的分布式锁
数据库的更新操作都是原子操作,查询不一定是原子操作。一般是先查询再更新(比如查账户上有多少钱,如果要取得钱<=账户上的钱,则取)。
但是这样造成一个问题,当查询+更新不是原子操作,所以会出问题。(即你查询了有100元,当你要更新的时候,不一定就是100元了,在你查询到更新中间可能已经有人把钱给动了)
关系型数据库大多是悲观锁。(synchronized,ReentrantLock都是悲观锁的实现)
-
不加锁
<update id="updateAmount"> UPDATE user_account SET amount = amount -#{money} WHERE is_active=1 AND id=# {id} </update>
直接扣钱
-
乐观锁
每次查询的时候都认为别人不会修改,所以不上锁。更新的时候判断此期间有没有人修改,有就重新操作。(重新查询,重新判断)<update id="updateByPKVersion"> update user_account set amount = amount - #{money},version=version+1 where id = #{id} and version=# {version} and amount >0 and (amount -#{money})>=0 </update>
多了一个version的东西,查询的时候获得数据(账户余额100元,账户版本33版),更新数据检查版本然后扣钱(余额-100,版本+1,当版本=33的时候)。如果版本不符,则重新操作。
-
悲观锁
每次查询的时候都认为别人会修改,所以查的时候就直接上锁,直到更新完,把锁解开。(查询到更新期间如果有人想动数据,线程会阻塞,动不了。解开锁以后才能动)
JAVA里面的synchronized和ReentrantLock等独占锁就是悲观锁思想的实现
@Transaction 单服务节点锁定,多点有问题。
Redis分布式锁
- setnx
key不存在的时候设置,存在则失败 - expire
超时删除
步骤:
构造一个与共享资源或核心业务相关的Key;
调用SETNX、EXPIRE操作,获取该Key的锁;
完成业务后,及时释放锁。
ZooKeeper
下载:https://zookeeper.apache.org/releases.html
打开:在zookeepper的文件夹的bin中zkServer.cmd运行起来。
- 添加依赖:
zookeeper
+curator-framework
+curator-recipes
- 设置配置:
spring.config.import=optional:zookeeper:
spring.cloud.zookeeper.connect-string=localhost:2181
- 应用:
public R seckillZk(int goodsId) throws Exception
{
String lockPath = "/" + goodsId;
//如果锁不存在,上锁
if (client.checkExists().forPath(lockPath) == null)
client.create().withMode(CreateMode.PERSISTENT).forPath(lockPath);
//10秒内能否获取到锁(如果其他线程占用,则无法获取到锁)
InterProcessMutex mutex = new InterProcessMutex(client, lockPath);
boolean lockAcquired = mutex.acquire(10L, TimeUnit.SECONDS);
try
{
//10秒内获取到了锁,就进行数据库操作,否则返回失败
if (lockAcquired)
{
return seckill(goodsId);
} else
{
return R.error("获取锁失败");
}
} finally
{
mutex.release();
}
}
多次释放是可以的,不会出问题。
redisson
场景:
-
布朗过滤器
-
消息通信(基于发布订阅模式的主题,延迟队列(rabbitmq的延迟队列是从队头到队尾,所以排在后面的即使时间更短,也是顺序制定,而redisson没有这个bug,它的队列是虚拟队列,每个都是平等的,没有这个顺序一说))
-
分布式锁
-
添加redission依赖
-
设置配置:redisson.host.config = redis://127.0.0.1:6379
-
Bean:RedissonClient
@Autowired
private Environment env;
@Bean
public RedissonClient config(){
Config config = new Config();
config.useSingleServer().setAddress(env.getProperty("redisson.host.config")).setKeepAlive(true);
return Redisson.create(config);
}
- 应用
效率
- 单机锁效率最高:synchronized,reentrancelock(只能在同一个虚拟机上使用)
- redis锁
- redisson锁(可以用户redis集群)
- zookeeper(节点维护消耗了时间)
总结
分布式锁是在分布式系统中实现数据同步和互斥的一种重要机制。以下是一些常见的分布式锁(包括数据库锁、Redis锁和ZooKeeper锁)的优点、缺点以及适用场合的介绍:
-
数据库锁:
优点:- 简单易用,数据库具备 ACID 特性,可以通过记录行锁或表级锁实现数据的同步和互斥。
- 可以很好地保证数据的一致性。
缺点:
- 性能相对较低,特别是在高并发环境下,数据库的锁竞争可能成为性能瓶颈。
- 不适合用于分布式场景,当多个应用或服务实例操作同一数据库时,难以保证锁的全局互斥。
适用场合:
- 低并发情况下的数据同步和互斥。
- 需要对数据进行严格的一致性控制,可以容忍一定的性能损耗。
-
Redis锁:
优点:- 快速高效,在内存中操作,适合处理高并发。
- 支持设置锁的过期时间,可以避免死锁。
缺点:
- 可能存在锁竞争问题,需要处理好并发场景下的锁争用情况。
- Redis 单点故障会导致分布式锁失效。
适用场合:
- 高并发场景,例如秒杀业务等。
- 不要求强一致性,可以容忍一定的数据不一致。
-
ZooKeeper锁:
优点:- 具备强一致性,ZooKeeper 提供了严格的顺序性保证。
- 支持分布式环境,可以实现跨多个应用和服务实例的锁同步。
缺点:
- 性能较差,ZooKeeper 集群的延迟和吞吐量受限于网络通信和磁盘 IO,不适合高并发场景。
- 部署和维护较为复杂,对运维要求较高。
适用场合:
- 强一致性要求较高的场景,例如分布式事务。
- 对性能要求相对较低,可以容忍一定的延迟。
笔记7-redisson
回顾
- Springboot
- redis
- rabbitMQ异步消息(使用异步进行耦合度降低)
- 分布式锁
- redisson(基于redis,扩展应用级组件,属于客户端层,不属于服务端层中间件)
redisson
布朗过滤器
redisson.host.config=redis://127.0.0.1:6379
package com.example.service;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Autowired(required = false)
private RedisConnectionFactory redisConnectionFactory; //Redis连接工厂(redis-cli窗口)
@Bean
public RedisTemplate<String,Object> redisTemplate(){ //相当于redis-cli.exe的命令行
RedisTemplate<String,Object> redisTemplate=new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer()); //指定大key序列化策略为为String序列化
// redisTemplate.setKeySerializer(new JdkSerializationRedisSerializer());
redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
//redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); //JSON泛型
redisTemplate.setHashKeySerializer(new StringRedisSerializer()); //指定hashKey序列化策略为String序列化
return redisTemplate;
}
/**
* 缓存redis-stringRedisTemplate
*/
@Bean
public StringRedisTemplate stringRedisTemplate(){
//采用默认配置即可-后续有自定义配置时则在此处添加即可
StringRedisTemplate stringRedisTemplate=new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
return stringRedisTemplate;
}
@Autowired
private Environment env;
// @Bean
// public CuratorFramework curatorFramework(){
// CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
// .connectString(env.getProperty("zk.host"))
// .namespace(env.getProperty("zk.namespace"))
// .retryPolicy(new RetryNTimes(5, 1000))
// .build();
// curatorFramework.start();
// return curatorFramework;
// }
//Redisson Client
/**
* 自定义注入配置操作Redisson的客户端实例
* @return
*/
@Bean
public RedissonClient config(){
//创建配置实例
Config config=new Config();
//可以设置传输模式为EPOLL,也可以设置为NIO等等
//config.setTransportMode(TransportMode.NIO);
//设置服务节点部署模式:集群模式;单一节点模式;主从模式;哨兵模式等等
//config.useClusterServers().addNodeAddress(env.getProperty("redisson.host.config"),env.getProperty("redisson.host.config"));
config.useSingleServer()
.setAddress(env.getProperty("redisson.host.config"))
.setKeepAlive(true);
//创建并返回操作Redisson的客户端实例
return Redisson.create(config);
}
}
@SpringBootTest
public class RedissonTest {
@Autowired(required = false)
private RedissonClient redissonClient;
@Test
public void testRedisson() throws IOException {
System.out.println(redissonClient.getConfig().toJSON());
}
@Test
public void testBloomFilter(){
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("bloomFilter_test"); //redis
bloomFilter.tryInit(100000, 0.01); //HashKey空间大小,码重复率
bloomFilter.add("a");
bloomFilter.add("b");
bloomFilter.add("c");
System.out.println(bloomFilter.contains("a")); //TODO: 不能说明a存在!
System.out.println(bloomFilter.contains("d")); //不存在!!
}
@Test
public void testBloomFilter1(){
//Key空间!!!
RBloomFilter<Integer> bloomFilter = redissonClient.getBloomFilter("bloomFilter_test1"); //redis
bloomFilter.tryInit(100, 0.01); //HashKey空间大小,码重复率
for(int i = 1;i < 2000;i++)
bloomFilter.add(i);
System.out.println(bloomFilter.contains(1000));
System.out.println(bloomFilter.contains(5000));
//当空间太小,就会有大部分都是true的结果
}
}
true不能说明问题,可能存在可能不存在。
false一定不存在。
发布订阅式主题
- Redisson提供轻量的类似RabbitMQ的队列/主题功能。
- RTopic:创建或获取主题
- 监听者必须实现ApplicationRunner和Ordered接口
package redisson;
import dto.KnowledgeInfo;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RTopicPublisher {
@Autowired
private RedissonClient redissonClient;
private String topicKey = "redissonTopicKey";
public void sendMsg(KnowledgeInfo info){
try {
RTopic topic = redissonClient.getTopic(topicKey);
topic.publishAsync(info);
}catch (Exception e){
e.printStackTrace();
}
}
}
package redisson;
import dto.KnowledgeInfo;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
@Component
public class RTopicConsumer implements ApplicationRunner, Ordered {
//构造Redisson客户端操作实例
@Autowired
private RedissonClient redissonClient;
private String topicKey = "redissonTopicKey";
@Override
public void run(ApplicationArguments args) throws Exception {
RTopic topic = redissonClient.getTopic(topicKey);
//委托代理
topic.addListener(KnowledgeInfo.class, new MessageListener<KnowledgeInfo>() {
@Override
public void onMessage(CharSequence charSequence, KnowledgeInfo dto) {
//log.info("记录用户登录成功后的轨迹-消费者-监听消费到消息:{} ",dto);
System.out.println(dto);
//判断消息是否为null
if (dto!=null){
//如果消息不为null,则将消息记录入数据库中
//TODO: save log
}
}
});
}
@Override
public int getOrder() { //规定启动顺序(优先级值)
return 0;
}
}
package com.example.service;
import dto.DeadInfo;
import dto.KnowledgeInfo;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.redisson.RedissonBloomFilter;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import redisson.RTopicPublisher;
import redisson.RedissonDelayQueuePublisher;
import java.io.IOException;
@SpringBootTest
public class RedissonTest {
@Autowired(required = false)
private RedissonClient redissonClient;
@Autowired(required = false)
private RTopicPublisher topicPublisher;
@Test
public void testRTopic(){
topicPublisher.sendMsg(new KnowledgeInfo(1001, "aaaa", "bbb"));
try{
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
redisson RTopic 和 rabbitmq TopicExchange比较:
- 模型。前者用队列来模拟/或用缓存里的集合来模拟,没有日志。后者有日志,有死信队列,完整的。
- 消息发送。前者直接放入队列,后者放到exchange里面转存到队列。
最好用rabbitmq,丢失有日志。
延迟队列
package com.example.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import dto.DeadInfo;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import rabbitmq.*;
import test.Message;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private BasicPublisher basicPublisher;
@Autowired
private FanoutPublisher fanoutPublisher;
@Test
public void testMsg() throws JsonProcessingException {
//单元测试
//basicPublisher.sendMessage("message from test....");
basicPublisher.sendMessage(new Message().setCode("002").setMsg("test......"));
//TODO: 如果消息来不及处理,这里需要适当延时
}
@Test
public void testFanoutMsg() throws JsonProcessingException {
//单元测试
fanoutPublisher.sendMsg(new Message().setCode("002").setMsg("test......"));
//TODO: 如果消息来不及处理,这里需要适当延时
}
@Autowired
private TopicPublisher topicPublisher;
@Test
public void testTopicMsg(){
topicPublisher.sendMsgTopic("test msg from topic 1",
"local.middleware.mq.topic.routing.tom.key"); //one, two
topicPublisher.sendMsgTopic("test msg from topic 2",
"local.middleware.mq.topic.routing.tom.jerry.key"); //two
topicPublisher.sendMsgTopic("test msg from topic 3",
"local.middleware.mq.topic.routing.key"); //two
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Autowired
private KnowledgeManualPublisher knowledgeManualPublisher;
@Test
public void manualTest(){
knowledgeManualPublisher.sendAutoMsg(new Message().setCode("002").setMsg("test......"));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Autowired
private DeadPublisher deadPublisher;
@Test
public void testDelayQueue(){
deadPublisher.sendMsg(new DeadInfo(1001, "A"), 10);
deadPublisher.sendMsg(new DeadInfo(1001, "B"), 5);
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package redisson;/**
* Created by Administrator on 2019/5/2.
*/
import dto.DeadInfo;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* Redisson延迟队列消息模型-消费者
* @Author:debug (SteadyJack)
* @Date: 2019/5/2 17:11
**/
@Component
@EnableScheduling
public class RedissonDelayQueueConsumer{
//定义日志
private static final Logger log= LoggerFactory.getLogger(RedissonDelayQueueConsumer.class);
//定义Redisson的客户端操作实例
@Autowired
private RedissonClient redissonClient;
/**
* 监听消费真正队列中的消息
* 每时每刻都在不断的监听执行
* @throws Exception
*/
@Scheduled(cron = "*/1 * * * * ?")
public void consumeMsg() throws Exception {
//定义延迟队列的名称
final String delayQueueName="redissonDelayQueueV3";
RBlockingQueue<DeadInfo> rBlockingQueue=redissonClient.getBlockingQueue(delayQueueName);
//从队列中弹出消息
DeadInfo msg=rBlockingQueue.take();
if (msg!=null){
log.info("Redisson延迟队列消息模型-消费者-监听消费真正队列中的消息:{} ",msg);
//TODO:在这里执行相应的业务逻辑
}
}
}
package redisson;/**
* Created by Administrator on 2019/5/2.
*/
import dto.DeadInfo;
import org.redisson.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* Redisson延迟队列消息模型-生产者
* @Author:debug (SteadyJack)
* @Date: 2019/5/2 17:10
**/
@Component
public class RedissonDelayQueuePublisher {
//定义日志
private static final Logger log= LoggerFactory.getLogger(RedissonDelayQueuePublisher.class);
//定义Redisson的客户端操作实例
@Autowired
private RedissonClient redissonClient;
/**
* 发送消息入延迟队列
* @param msg 消息
* @param ttl 消息的存活时间-可以随意指定时间单位,在这里指毫秒
*/
public void sendDelayMsg(final DeadInfo msg, final Long ttl){
try {
//定义延迟队列的名称
final String delayQueueName="redissonDelayQueueV3";
//定义获取阻塞式队列的实例
RBlockingQueue<DeadInfo> rBlockingQueue=redissonClient.getBlockingQueue(delayQueueName);
//定义获取延迟队列的实例
RDelayedQueue<DeadInfo> rDelayedQueue=redissonClient.getDelayedQueue(rBlockingQueue);
//往延迟队列发送消息-设置的TTL,相当于延迟了“阻塞队列”中消息的接收
rDelayedQueue.offer(msg,ttl,TimeUnit.MILLISECONDS);
log.info("Redisson延迟队列消息模型-生产者-发送消息入延迟队列-消息:{}",msg);
}catch (Exception e){
log.error("Redisson延迟队列消息模型-生产者-发送消息入延迟队列-发生异常:{}",msg,e.fillInStackTrace());
}
}
}
package com.example.service;
import dto.DeadInfo;
import dto.KnowledgeInfo;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.redisson.RedissonBloomFilter;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import redisson.RTopicPublisher;
import redisson.RedissonDelayQueuePublisher;
import java.io.IOException;
@SpringBootTest
public class RedissonTest {
@Autowired(required = false)
private RedissonClient redissonClient;
@Autowired
private RedissonDelayQueuePublisher delayQueuePublisher;
@Test
public void testRDelayedQueue(){
delayQueuePublisher.sendDelayMsg(new DeadInfo(1000, "A"), 10000L);
delayQueuePublisher.sendDelayMsg(new DeadInfo(1000, "B"), 5000L);
try{
Thread.sleep(15000);
}catch (Exception e){
e.printStackTrace();
}
}
}
rabbitMQ的队列虽然可以延迟,但是依然保留的队列的特性(即严格先进先出)(即不会自动扫描队列中元素的各自延迟时间)。
Redisson提供RDelayedQueue、RBIockingQueue作为延迟队列。
最好用rabbitmq,性能最高,因为不用扫描。
笔记8-springcloud
5个核心组件
- 服务注册中心
作用:服务注册、服务发现
常用:Eureka、Zookeeper、Consul、Nacos - 负载均衡
作用:集群分配策略、最优性能常用:Ribbon、Feign、OpenFeign等 - 熔断降级
作用:服务保护
常用:Hystrix、Sentinel、Resilience4J等 - 路由网关
作用:过滤、身份验证等常用:zuul、gateway等 - 配置中心
作用:动态配置、统一管理
常用:Spring Cloud Config、Nacos Config
更多推荐
所有评论(0)