笔记1-springboot框架搭建

搭建一个SpringBoot框架

  1. new project(maven)
  2. new module(maven):api(纯JavaBean模块,没有任何服务).jar
    • 添加依赖lombok和jackson-annotations
    <dependency>
    	<groupId>org.projectlombok</groupId> 
    	<artifactId>lombok</artifactId> 
    	<version>1.18.28</version>
    </dependency>
    <dependency>
    	<groupId>com.fasterxml.jackson.core</groupId> 
    	<artifactId>jackson-annotations</artifactId> 				
    	<version>2.15.2</version>
    </dependency>
    
    • 在这里定义实体类
  3. new module():service(以服务形式提供) SpringBoot(multi-instance)
    • 选择依赖Spring web
    • 添加依赖api
    <dependency>
    	<groupId>org.example</groupId> 
    	<artifactId>api</artifactId> 
    	<version>1.0-SNAPSHOT</version> 		
    </dependency>
    
    • 在这里编写controller
    • application.yml中添加端口号,任意定义(如6001)
  4. new module:server(以服务形式提供) SpringBoot
    • 选择依赖Spring web和Thymeleaf
    • 添加依赖api
    • 在这里编写controller
    • application.yml中添加端口号,80
  5. coding: Controller…
    串:api(JavaBean),service(服务,JavaBeanObject),server(服务,从service获取并转化为对象)
  6. deploy & test(service, server)
    mybatis->service
void setCode(String code)
Message setCode(String code)
{
	...
	return this;
}

实例

api模块定义实体类

@Data
@NoArgsConstructor
//@Accessors(chain=true) //链式
public class Message
{
	private String code, msg;
}

service中编写controller
非链式

@RestController
public class MessageController
{
	@RequestMapping("/test/msg")
	public Message helloMsg()
	{
		Message msg = new Message();
		msg.setCode("001");
		msg.setMsg("Hello SpringBoot.");
		return msg;
	}
}

链式

@RestController
public class MessageController
{
	@RequestMapping("/test/msg")
	public Message helloMsg()
	{
		return new Message().setCode("001").setMsg("Hello SpringBoot.");
	}
}

访问localhost:6001/test/msg可以访问获取到信息了。

在server中编写config

@Configuration
public class Config {
	@Bean
	public RestTemplate restTemplate()
	{
		return new RestTemplate();
	}
}

在server中编写redis的模板config

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig
{
    @Autowired(required = false)
    private RedisConnectionFactory redisConnectionFactory;//Redis连接工厂(相当于redis-cli的窗口)

    @Bean
    public RedisTemplate<String, Object> redisTemplate()//相当于redis-cli的命令行
    {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
//        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//JSON泛型
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate()
    {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
        return stringRedisTemplate;
    }
}

JdkSerializationRedisSerializer和GenericJackson2JsonRedisSerializer的区别是:

上下两种分别是JdkSerializationRedisSerializer比GenericJackson2JsonRedisSerializer的效果。
JdkSerializationRedisSerializer比GenericJackson2JsonRedisSerializer快,因为JdkSerializationRedisSerializer在传输时对数据进行了序列化。
如果用于java语言,那么两种都能用。
但是如果用于c++或者其他,无法接受序列化以后的内容。

controller

package com.example.server.controller;

import com.example.service.util.R;
import domain.Course;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Resource;

@RestController
@RequestMapping
public class MyCourseController
{
    private final String URL_PREFIX = "http://localhost:8081";
    @Resource
    private RestTemplate restTemplate;

    @GetMapping("/my/course")
    public R testCourse()
    {
        return restTemplate.getForObject(URL_PREFIX + "/course", R.class);
    }

    @Resource
    private RedisTemplate redisTemplate;

    @GetMapping("/test")
    public Object myRedis()
    {
        ValueOperations<String, Object> opsv = redisTemplate.opsForValue();
        opsv.set("mykey", new Course(1, "name", null, null, 20.0, "introduction", "1"));
        return opsv.get("mykey");
    }
}

现在访问localhost/my/msg就相当于访问localhost:6001/test/msg了。
相当于多了一层隔离,但是速度会比较慢。
好处在springboot中难以体现,springcloud里面就可以感受到。
当有多个service的时候,可以通过server去定端口。不能让服务端去自主选择端口。
如果多个独立service没有交互,但是其他地方需要同时用到。可以在server里面交互,service可以保持独立。

笔记2-springboot整合redis

redis

Sprintboot整合redis:

  1. 添加依赖:redis客户端(不推荐jedis,是独立框架,第三方包)
  2. application.properties(配置redis的端口)
  3. 配置(redisTemplate)
  4. 编写代码

泛型:编译现象

java中redis泛型,如“<String,Integer>”只在java中受到限制,在其他地方(如redis_cli.exe)还是可以插入其他类型的值。

海量数据中的查找?

数据库中的方法:

索引:index进行检索条件匹配。(坏处:增加数据库开销,要有个地方放置索引。每次存入数据的时候,要更新索引。)

redis

equals:逻辑上相等
hashCode:散列码
Bloom Filter

mybatis整合

service:
model:bean
mapper:xml
dao:dao

server(DAO:Data Transfer Object):

数据库:

?userUnicode=true&characterEncoding=utf-8&serverTimezone=UTC&userSSL=false

附录

项目日常添加依赖
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
redisTemplate
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.concurrent.TimeUnit;

@Configuration
public class RedisConfig
{
    @Autowired(required = false)
    private RedisConnectionFactory redisConnectionFactory;//Redis连接工厂(相当于redis-cli的窗口)

    @Bean
    public RedisTemplate<String, Object> redisTemplate()//相当于redis-cli的命令行
    {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
//        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());//JSON泛型
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }


    @Bean
    public StringRedisTemplate stringRedisTemplate()
    {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
        return stringRedisTemplate;
    }
}

笔记3-redis缓存穿透

上节课知识回顾

server好处:

  1. SpringCloud
  2. 网关隔离(内网,保护机制)
  3. server中可能业务重组实现

Redis缓存

  1. springboot-starter-data-redis(Config/@Bean是对redis-cli.exe的封装)
  2. jedis第三方

redis不建议做数据库:

  1. 用的是内存,较小。
  2. 没有备份找回机制,不稳定。

本课堂:redis

redis缓存穿透问题:

  1. 查询id
  2. redis未命中id -> 到数据库中查询
    1. 如果数据库中存在,将【id,查询结果】存入redis,输出结果
    2. 如果数据库中不存在,输出NULL
  3. redis命中id -> 输出redis结果
ValueOperations<String, Object> vop = redisTemplate.opsForValue();
String key = "red_" + id;
RedRecord red = (RedRecord) vop.get(key);//从redis中查找id
if (red == null)
{
    red = redRecordDao.selectByPrimaryKey(id);//从数据库查找id
}
return red;//返回查询数据

导致了可能同一个不存在id一直未命中,所以一直在redis和数据库查询的问题。

解决方案:

  1. 查询id
  2. redis未命中id -> 到数据库中查询
    1. 如果数据库中存在,将【id,查询结果】存入redis,输出结果
    2. 如果数据库中不存在,将【id,空值(空值即空字符串,不能是NULL)】存入redis,输出NULL
  3. redis命中id -> 输出redis结果

注意:上面的空值与NULL有区别。在redis中查询,
返回NULL:
是代表:该id不存在于缓存中。
后续:需要到数据库查询。
返回空值(可以是一个空字符串):
是代表该id不存在于缓存中也不存在与数据库中,但上次已经查询过了,所以在缓存中直接标记该数据为空。
后续:直接输出NULL

ValueOperations vop = redisTemplate.opsForValue();
String key = "red_" + id;
String value = (String) vop.get(key); //从redis中查找id
//redis命中
if(value != null){
    //redis查询结果是空值
    if(value.length() == 0) {
        return "not found!!";
    }else return objectMapper.readValue(value, RedRecord.class);
}
//redis不命中
RedRecord red = redRecordDao.selectByPrimaryKey(id); 
//数据库查询结果为空
if(red == null) 
    vop.set(key, "", 30, TimeUnit.SECONDS); //记录不存在,设置超时自动删除
else 
    vop.set(key, objectMapper.writeValueAsString(red));
return red;

这样当第一次未命中,缓存中就已经有【id,空值】的记录。后面每次查询,redis中已经存在该记录,不用查询数据库。

讨论

这个解决方法效率如何
这个方法只能解决每次都查询相同的不存在的id。
如果是每次都查询不同的不存在的id,则不适用。依然存在冗余+穿透。

布隆过滤器

前提:

假设有2个Hash映射函数:
h1(x),h2(x)
现在要保存这几个值:4,5,6

4:h1(4)=100,h2(4)=105 
5:h1(5)=105,h2(5)=106 
6:h1(6)=101,h2(6)=107 
12……100101102103104105106107
00……11000111
查找:
  1. 查找7
    假设h1(7)=105,h2(7)=108
    find(105)=1,find(108)!=0,有任意一个不等于1,则不存在,所以7不存在。
  2. 查找8
    假设h1(8)=105,h2(8)=100
    find(105)=1,find(100)!=1,两个都等于1,则为不确定状态,所以7可能存在也可能不存在。
讨论:

这个方法优点:确定了一些不存在的值,减少穿透。
缺点:有一些误识别率和删除困难。

缓存问题汇总:

缓存穿透

缓存系统中无法命中所需数据,导致每次查询都要直接访问后端数据源。
**解决方法:**空值缓存,布隆过滤器

缓存击穿(即:缓存无数据,数据库有数据,key比较集中)

如高并发的情况下,热点数据缓存过期,这时候会导致大量请求读不到缓存同时读数据库,导致数据库负载过大。(比如:缓存是半小时清理一次,清理完刚好100万条访问,此时就要有100万条访问数据库)
解决方法:

  1. 分布式锁,每次查询只有一个加载到缓存中(就是把清理掉的热点缓存,又一个一个从数据库添加到缓存)。但是降低了并发。
  2. 设定key不要同一时刻大量过期
缓存雪崩(即:缓存无数据,数据库有数据,key比较分散)

如在高并发的情况下,缓存同一时刻失效(如缓存挂了,或者设置了相同过期时间),所有请求会读数据库,容易导致数据库负载瞬间上升,乃至崩掉。如果重启数据库,立马又会被新的请求压崩。
**解决方法:**与上一条同

缓存预热

缓存预热是指系统上线后,提前将相关的缓存数据加载到缓存系统中,避免刚上线使用户有太多请求打到数据库上去,然后再去将数据缓存的问题。缓存降级
缓存降级是指缓存失效或缓存服务器挂掉的情况下,不去访问数据库,直接返回默认数据或访问服务的内存数据。降级一般是有损的操作,所以尽量减少降级对于业务的影响程度。

下节课预习

抢红包:
(1)逻辑:添加红包(随机分配)
(2)业务:发红包,抢红包
(3)并发测试:jemter
(4)可能存在的问题:同一个用户抢到多个红包?解决方法?

笔记4-redis应用抢红包

抢红包:

(1)逻辑:添加红包(随机分配)
(2)业务:发红包,抢红包
(3)并发测试:jemter
(4)可能存在的问题:同一个用户抢到多个红包?解决方法?

非分布式

示例1:

取记录,判断,减内存中的数值,保存到数据库
问题:
单线程没问题。多线程会出问题。(即数据库中从1000变成0了,但是实际上不止发了10次红包)

示例2:

在mapper中,不再是直接赋值。
而是通过数据库本身的原子操作:

update xxx set amount = amount - #{money}
where id = #{id}

结果:超支。(即实际上不止发了10次红包,数据库中变成负数了)

示例3:

不做判断,直接减。由返回值进行判断

update xxx set amount = amount - #{money}
where id = #{id} and amount >=100

结果:利用了sql的原子性,但是当请求过多,会造成数据库的堵塞。
就是虽然你不合理的请求,扣完经过判断以后,不扣了。
这一步在逻辑上是没问题,但是这个请求还是经过了sql。

示例4:
void synchronized 方法(){
    ……
}

坏处:

  1. 并发度下降,对同一个对象来说是串行执行
    => synchronized中的代码要尽量段(代码量少,且执行时间尽量短)
  2. 你能确定只有一个service对象在运行吗?如果又new了一个service,那么逻辑仍然会出错。
  3. 集群环境中service肯定是存在多个,这时候如何互斥?

使用分布式锁(对业务加锁)

前提:

每个用户对于每个红包只能抢1次。
每个红包(比如100),会被提前分成多个小红包,多个小红包加起来是100。
输入用户id红包id,进行下面的红包分配。

过程:
  1. 判断该用户是否已经抢过红包:判读"用户id+红包id"是否存在于redis中。
  2. 如果抢过,则return -1
  3. 没抢过,则判断是否有剩余红包:判断"红包id:total"的值是否>0
  4. 如果<=0,则return -1
  5. 如果有剩余红包,则redisTemplate.opsForList().leftPop(红包id);从红包队列中取出一个小红包
  6. 判断该小红包是有效的,如果小红包==null 或 小红包 ==“”,则return -1
  7. 如果有效,则更新数据(更新"红包id:total",更新"用户id+红包id"),返回小红包的id
  8. 记录到数据库(用户+红包金额)
错误代码
public int rob(String redId, Integer userId){ //TODO: 加synchronized也无法实现!!
    String robId = redId + ":" + userId; //记录是否已经抢到红包
    //1. 查询是否已经抢到红包,若已经抢到,则返回
    ValueOperations valueOperations = redisTemplate.opsForValue();
    Object obj = valueOperations.get(robId);
    if(obj != null && "".equals("" + obj) == false){
        return Integer.valueOf("" + obj);
    }
    //2. 若剩余红包列表为空,则返回失败。否则,取一个红包,并返回
    try{ //cheat!!!
        Thread.sleep(50);
    }catch (Exception e){}

    ValueOperations strOperations = stringRedisTemplate.opsForValue();
    int total = Integer.valueOf("" + strOperations.get(redId + ":total"));
    if(total <= 0) return -1; //fail
    //从队列取一个红包
    ListOperations listOperations = redisTemplate.opsForList();
    Object rr = listOperations.leftPop(redId);  //原子操作
    if (rr == null || "".equals("" + rr)) return -1;
    //strOperations.set(redId + ":total", "" + (total - 1)); //TODO: 可能存在不一致性!!
    strOperations.decrement(redId + ":total"); //原子性
    valueOperations.set(robId, rr); //redis recording
    logger.info("success, user:" + userId + ",rob:" + rr);
    //TODO: 记录到数据库
    return Integer.valueOf("" + rr);
}
正确代码
public /*synchronized*/ int rob_lock(String redId, Integer userId){
    String robId = redId + ":" + userId; //记录是否已经抢到红包
    //1. 查询是否已经抢到红包,若已经抢到,则返回
    ValueOperations valueOperations = redisTemplate.opsForValue();
    Object obj = valueOperations.get(robId);
    if(obj != null && "".equals("" + obj) == false){
        return Integer.valueOf("" + obj);
    }
    //2. 若剩余红包列表为空,则返回失败。否则,取一个红包,并返回
    try{ //cheat!!!
        Thread.sleep(100);
    }catch (Exception e){}

    ValueOperations strOperations = stringRedisTemplate.opsForValue();
    int total = Integer.valueOf("" + strOperations.get(redId + ":total"));
    if(total <= 0) return -1; //fail
    //相当于在redis中synchronized(robId + "-lock")
    Boolean lock = valueOperations.setIfAbsent(robId + "-lock", "123"); //分布式锁(value不重要)
    if(lock) {
        ListOperations listOperations = redisTemplate.opsForList();
        Object rr = listOperations.leftPop(redId);  //原子操作
        if (rr == null || "".equals("" + rr)) return -1;
        //update total
        strOperations.decrement(redId + ":total"); //redis decr
        //strOperations.set(redId + ":total", total - 1);
        valueOperations.set(robId, rr); //redis recording
        logger.info("success, user:" + userId + ",rob:" + rr);
        //3. TODO: 写入数据库

        //问题:能不能删除-lock这个锁?不能删除(并发环境下,不可预知其他线程的执行位置!!)
        //可以使用超时,比如:5分钟
        return Integer.valueOf("" + rr);
    }else return -1;
}
重点:
  1. redis中红包剩余数量的原子性
    更新"红包id:total"的时候不能使用strOperations.set(redId+":total",xxx),而是要使用strOperations.decrement(redId+":total")。保证原子性。
  2. redis中取出红包的原子性
    要将6-8步骤的放到锁里。
Boolean lock = value0perations.setIfAbsent(robId + "-lock", "123"); //123是随便写的,这句话只是为了让key存在,即robId+"-lock"存在,即"红包id+用户id-lock"存在。
if(lock){
    从队列中取出小红包,更新redis数据,记录写入数据库。
}
解析:

为什么会出现这个问题?
设想问题出现的过程,即一个用户能成功抢到红包的过程:
1该用户没抢过红包 -> 2有剩余小红包 -> 3取出小红包 -> 4小红包数量-1 -> 5将用户状态设置为已经抢过红包 -> 6用户抢红包数据写入数据库

同一个用户有多个线程进入1,进入2,进入3,进入4。
直到第一个线程到5的时候,此时,1才会对该用户的线程进行关闭。
但是这个时候已经有很多个线程在2,3,4了,1虽然关闭了大门,但是这些线程已经进门了。所以拦不住,他们会继续经过5.

那么是如何解决的呢?
在5之前又设置了一道门。"用户id+红包id"的门。

笔记5-RabbiMQ

RabbiMQ

添加依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
设置配置参数
# RabbitMQ
spring.rabbitmq.virtual-host=/
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Queue definitions
mq.env=local
mq.basic.info.queue.name=${mq.env}.middleware.mq.basic.info.queue
mq.basic.info.exchange.name=${mq.env}.middleware.mq.basic.info.exchange
mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key

注意:spring.rabbitmq.port=5672
不是15672

RabbitMQConfig
package com.jihuiting.service.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig
{
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);
    @Autowired(required = false)
    private CachingConnectionFactory cachingConnectionFactory; //MQ连接
    @Autowired(required = false)
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; //监听器配置

    /**
     * 单一消费者
     */
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer()
    { //客户端连接监听器(Connection, Channel....)
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter()); //TODO: JSON
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        return factory;
    }

    /**
     * RabbitMQ发送消息的操作组件实例
     */
    @Bean
    public RabbitTemplate rabbitTemplate()
    {
        cachingConnectionFactory.setPublisherConfirms(true);
        cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
        cachingConnectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause));
        rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("消息丢失:{}", returnedMessage));
        return rabbitTemplate;
    }

}
实现定义队列和交换器(绑定键)
package com.jihuiting.service.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置类
 *
 * @author pan_junbiao
 **/
@Configuration
public class TopicRabbitMqConfig
{
    public static final String TOPIC_QUEUE_NAME = "testQueue"; //队列名称A
    public static final String TOPIC_EXCHANGE_NAME = "testExchange"; //交换器名称A
    public static final String TOPIC_ROUTING_KEY = "topic.routingKey.testQueue_testExchange"; //绑定键

    /**
     * 队列
     */
    @Bean
    public Queue topicQueueA()
    {
        /**
         * 创建队列,参数说明:
         * String name:队列名称。
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
         * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         * 当没有生产者或者消费者使用此队列,该队列会自动删除。
         * Map<String, Object> arguments:设置队列的其他一些参数。
         */
        return new Queue(TOPIC_QUEUE_NAME, true);
    }

    /**
     * 交换器
     */
    @Bean
    public DirectExchange basicExchange()
    {
        /**
         * 创建交换器,参数说明:
         * String name:交换器名称
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         */
        return new DirectExchange(TOPIC_EXCHANGE_NAME, true, false);
    }

    @Bean
    Binding bindingDirect(DirectExchange directExchange,Queue directQueue)
    {
        return BindingBuilder.bind(directQueue).to(directExchange).with(TOPIC_ROUTING_KEY);
    }
}
创建发送者
package com.jihuiting.service.controller;

import com.jihuiting.service.service.TopicRabbitMqService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/topic")
public class TopicSender
{
    @Autowired
    private TopicRabbitMqService topicRabbitMqService;

    @GetMapping("/send/{queueName}/{exchangeName}/{context}")
    public void sendTopicMessage(@PathVariable("queueName") String queueName, 
                                 @PathVariable("exchangeName") String exchangeName, 
                                 @PathVariable("context") String context)
    {
        System.out.println("-------------------------------------------- ");
        System.out.println("sendTopicMessage方法:");
        System.out.println("queueName——"+queueName);
        System.out.println("exchangeName——"+exchangeName);
        System.out.println("context——"+context);
        System.out.println("-------------------------------------------- ");
        //http://localhost:8080/topic/send/testQueue/testExchange/theTestContext
        this.topicRabbitMqService.SendMessage(exchangeName,queueName,context);
    }
}

创建接收者
package com.jihuiting.service.controller;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicReceiver
{
    @RabbitListener(queues = "testQueue")
    public void receiveMessage(String message) {
        System.out.println("【testQueue】接收到消息:" + message);
    }
}

下面的写法也可以:

package com.jihuiting.service.controller;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 接收者
 * @author pan_junbiao
 **/
@Component
@RabbitListener(queues = "testQueue")
public class TopicReceiverA
{
    @RabbitHandler
    public void process(String msg)
    {
        System.out.println("【testQueue】接收到消息:" + msg);
    }
}
创建绑定关系
@RestController
@RequestMapping("/topic")
public class TopicSender
{
    @Autowired
    private TopicRabbitMqService topicRabbitMqService;

    @GetMapping("/bind/{queueName}/{exchangeName}")
    public void bindingExchange(@PathVariable("queueName") String queueName, @PathVariable("exchangeName") String exchangeName)
    {
        System.out.println("-------------------------------------------- ");
        System.out.println("bind方法:");
        System.out.println("queueName——"+queueName);
        System.out.println("exchangeName——"+exchangeName);
        System.out.println("-------------------------------------------- ");
        //http://localhost:8080/topic/bind/testQueue/testExchange
        this.topicRabbitMqService.bindingExchange(queueName,exchangeName);
    }
}
@Service
public class TopicRabbitMqService
{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public String getRoutingKey(String queueName, String exchangeName)
    {
        return "topic.routingKey."+queueName + "_" + exchangeName;
    }

    public Binding bindingExchange(String queueName, String exchangeName)
    {
        System.out.println("===============");
        System.out.println("routingKey = "+getRoutingKey(queueName, exchangeName));
        System.out.println("===============");
        Queue queue = CreateTopicQueue(queueName);
        TopicExchange topicExchange = CreateExchange(exchangeName);
        return BindingBuilder.bind(queue).to(topicExchange).with(getRoutingKey(queueName, exchangeName));
    }
}
报错
  1. java.net.SocketException: Socket closed
    排除1:配置文件问题(账号,密码,端口)
    排除2:试图创建新用户(无效)
    排除3:视图修改spring.rabbitmq.virtual-host=/,在http://localhost:15672/也修改了(无效)
    解决4:
    建立接收者的时候
@Component
public class TopicReceiver
{
    @RabbitListener(queues = "testQueue")
    public void receiveMessage(String message) {
        System.out.println("[testQueue]接收到消息:" + message);
    }
}

没有注册bean,在TopicRabbitMqConfig中


public static final String TOPIC_QUEUE_NAME_A = "testQueue"; //队列名称A
@Bean
    public Queue topicQueueA()
    {
        /**
         * 创建队列,参数说明:
         * String name:队列名称。
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
         * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         * 当没有生产者或者消费者使用此队列,该队列会自动删除。
         * Map<String, Object> arguments:设置队列的其他一些参数。
         */
        return new Queue(TOPIC_QUEUE_NAME_A, true);
    }

笔记6-分布式锁

回顾RabbitMQ

  • 除了RabbitMQ,还有ActiveMQ,RocketMQ,KafkaRedisson
  • 目的:异步消息(比如淘宝,百万级访问,你访问每个产品都需要记录日志。会很卡,此时使用RabbitMQ进行异步访问);解耦。
  • 死信队列(DLQ),延迟队列
    1. 有两个地方可以设置队列的延迟时长,分别是config的@Bean中和发布消息者中,如果两个地方设置了时长不一样,则生效的是相对较短的时长。
    2. 如果消息发布者发送了两个信息,发送了A等待时长10秒,然后发送了B等待时长5秒,那么最终,A和B是在第10秒的时候被处理的。因为队列是不会扫描的,即使B比A快,但只有A处理后,才会轮到B。所以适用于同时长的延迟。
    3. 不等长延迟:使用Redisson

今日知识:锁与分布式锁

前言

有些时候,并行程序中也必须串行执行,否则可能出现数据不一致性

  1. 商品下单(多个用户同时购买,则可能出现库存超卖)
  2. 多个用户同时操作同一个银行账号(可能导致数据丢失)
  • 单机锁:JDK提供的synchronized,Lock,ReentrantLock,ReentrantReadWriteLock(基本原理:同一个临界区的互斥锁)
  • 分布式环境下,“同一临界区”很难实现。(synchronized虽然很强势很好用,但是如果运行了两个service,就破了)。
  • 所以分布式锁应运而生。(分布式锁不是一个中间件,而是一种机制或实现方式)

准则

  1. 互斥性
  2. 避免死锁
  3. 高可用
  4. 可重入
  5. 公平锁(可选)

数据库的分布式锁

数据库的更新操作都是原子操作,查询不一定是原子操作。一般是先查询再更新(比如查账户上有多少钱,如果要取得钱<=账户上的钱,则取)。
但是这样造成一个问题,当查询+更新不是原子操作,所以会出问题。(即你查询了有100元,当你要更新的时候,不一定就是100元了,在你查询到更新中间可能已经有人把钱给动了)
关系型数据库大多是悲观锁。(synchronized,ReentrantLock都是悲观锁的实现)

  1. 不加锁

    <update id="updateAmount">
    UPDATE user_account SET amount = amount -#{money} WHERE is_active=1 AND id=# {id}
    </update>
    

    直接扣钱

  2. 乐观锁
    每次查询的时候都认为别人不会修改,所以不上锁。更新的时候判断此期间有没有人修改,有就重新操作。(重新查询,重新判断)

    <update id="updateByPKVersion">
    update user_account set amount = amount - #{money},version=version+1 
    where id = #{id} and version=# {version}
    and amount >0 and (amount -#{money})>=0 </update>
    

    多了一个version的东西,查询的时候获得数据(账户余额100元,账户版本33版),更新数据检查版本然后扣钱(余额-100,版本+1,当版本=33的时候)。如果版本不符,则重新操作。

  3. 悲观锁
    每次查询的时候都认为别人会修改,所以查的时候就直接上锁,直到更新完,把锁解开。(查询到更新期间如果有人想动数据,线程会阻塞,动不了。解开锁以后才能动)
    JAVA里面的synchronized和ReentrantLock等独占锁就是悲观锁思想的实现
    @Transaction 单服务节点锁定,多点有问题。

Redis分布式锁

  1. setnx
    key不存在的时候设置,存在则失败
  2. expire
    超时删除
步骤:

构造一个与共享资源或核心业务相关的Key;
调用SETNX、EXPIRE操作,获取该Key的锁;
完成业务后,及时释放锁。

ZooKeeper

下载:https://zookeeper.apache.org/releases.html
打开:在zookeepper的文件夹的bin中zkServer.cmd运行起来。

  1. 添加依赖:zookeeper+curator-framework+curator-recipes
  2. 设置配置:
spring.config.import=optional:zookeeper:
spring.cloud.zookeeper.connect-string=localhost:2181
  1. 应用:
public R seckillZk(int goodsId) throws Exception
    {
        String lockPath = "/" + goodsId;
        //如果锁不存在,上锁
        if (client.checkExists().forPath(lockPath) == null)
            client.create().withMode(CreateMode.PERSISTENT).forPath(lockPath);

        //10秒内能否获取到锁(如果其他线程占用,则无法获取到锁)
        InterProcessMutex mutex = new InterProcessMutex(client, lockPath);
        boolean lockAcquired = mutex.acquire(10L, TimeUnit.SECONDS);
        try
        {
            //10秒内获取到了锁,就进行数据库操作,否则返回失败
            if (lockAcquired)
            {
                return seckill(goodsId);
            } else
            {
                return R.error("获取锁失败");
            }
        } finally
        {
            mutex.release();
        }
    }

多次释放是可以的,不会出问题。

redisson

场景:

  1. 布朗过滤器

  2. 消息通信(基于发布订阅模式的主题,延迟队列(rabbitmq的延迟队列是从队头到队尾,所以排在后面的即使时间更短,也是顺序制定,而redisson没有这个bug,它的队列是虚拟队列,每个都是平等的,没有这个顺序一说))

  3. 分布式锁

  4. 添加redission依赖

  5. 设置配置:redisson.host.config = redis://127.0.0.1:6379

  6. Bean:RedissonClient

@Autowired
private Environment env;
@Bean
public RedissonClient config(){
Config config = new Config();
config.useSingleServer().setAddress(env.getProperty("redisson.host.config")).setKeepAlive(true);
return Redisson.create(config);
}
  1. 应用

效率

  1. 单机锁效率最高:synchronized,reentrancelock(只能在同一个虚拟机上使用)
  2. redis锁
  3. redisson锁(可以用户redis集群)
  4. zookeeper(节点维护消耗了时间)

总结

分布式锁是在分布式系统中实现数据同步和互斥的一种重要机制。以下是一些常见的分布式锁(包括数据库锁、Redis锁和ZooKeeper锁)的优点、缺点以及适用场合的介绍:

  1. 数据库锁:
    优点:

    • 简单易用,数据库具备 ACID 特性,可以通过记录行锁或表级锁实现数据的同步和互斥。
    • 可以很好地保证数据的一致性。

    缺点:

    • 性能相对较低,特别是在高并发环境下,数据库的锁竞争可能成为性能瓶颈。
    • 不适合用于分布式场景,当多个应用或服务实例操作同一数据库时,难以保证锁的全局互斥。

    适用场合:

    • 低并发情况下的数据同步和互斥。
    • 需要对数据进行严格的一致性控制,可以容忍一定的性能损耗。
  2. Redis锁:
    优点:

    • 快速高效,在内存中操作,适合处理高并发。
    • 支持设置锁的过期时间,可以避免死锁。

    缺点:

    • 可能存在锁竞争问题,需要处理好并发场景下的锁争用情况。
    • Redis 单点故障会导致分布式锁失效。

    适用场合:

    • 高并发场景,例如秒杀业务等。
    • 不要求强一致性,可以容忍一定的数据不一致。
  3. ZooKeeper锁:
    优点:

    • 具备强一致性,ZooKeeper 提供了严格的顺序性保证。
    • 支持分布式环境,可以实现跨多个应用和服务实例的锁同步。

    缺点:

    • 性能较差,ZooKeeper 集群的延迟和吞吐量受限于网络通信和磁盘 IO,不适合高并发场景。
    • 部署和维护较为复杂,对运维要求较高。

    适用场合:

    • 强一致性要求较高的场景,例如分布式事务。
    • 对性能要求相对较低,可以容忍一定的延迟。

笔记7-redisson

回顾

  • Springboot
  • redis
  • rabbitMQ异步消息(使用异步进行耦合度降低)
  • 分布式锁
  • redisson(基于redis,扩展应用级组件,属于客户端层,不属于服务端层中间件)

redisson

布朗过滤器

redisson.host.config=redis://127.0.0.1:6379
package com.example.service;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
    @Autowired(required = false)
    private RedisConnectionFactory redisConnectionFactory; //Redis连接工厂(redis-cli窗口)

    @Bean
    public RedisTemplate<String,Object> redisTemplate(){ //相当于redis-cli.exe的命令行
        RedisTemplate<String,Object> redisTemplate=new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer()); //指定大key序列化策略为为String序列化
//        redisTemplate.setKeySerializer(new JdkSerializationRedisSerializer());
        redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer());
        //redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); //JSON泛型
        redisTemplate.setHashKeySerializer(new StringRedisSerializer()); //指定hashKey序列化策略为String序列化
        return redisTemplate;
    }

    /**
     * 缓存redis-stringRedisTemplate
     */
    @Bean
    public StringRedisTemplate stringRedisTemplate(){
        //采用默认配置即可-后续有自定义配置时则在此处添加即可
        StringRedisTemplate stringRedisTemplate=new StringRedisTemplate();
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory);
        return stringRedisTemplate;
    }

    @Autowired
    private Environment env;
//    @Bean
//    public CuratorFramework curatorFramework(){
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
//                .connectString(env.getProperty("zk.host"))
//                .namespace(env.getProperty("zk.namespace"))
//                .retryPolicy(new RetryNTimes(5, 1000))
//                .build();
//        curatorFramework.start();
//        return curatorFramework;
//    }

    //Redisson Client
    /**
     * 自定义注入配置操作Redisson的客户端实例
     * @return
     */
    @Bean
    public RedissonClient config(){
        //创建配置实例
        Config config=new Config();
        //可以设置传输模式为EPOLL,也可以设置为NIO等等
        //config.setTransportMode(TransportMode.NIO);
        //设置服务节点部署模式:集群模式;单一节点模式;主从模式;哨兵模式等等
        //config.useClusterServers().addNodeAddress(env.getProperty("redisson.host.config"),env.getProperty("redisson.host.config"));
        config.useSingleServer()
                .setAddress(env.getProperty("redisson.host.config"))
                .setKeepAlive(true);
        //创建并返回操作Redisson的客户端实例
        return Redisson.create(config);
    }
}
@SpringBootTest
public class RedissonTest {
    @Autowired(required = false)
    private RedissonClient redissonClient;
    @Test
    public void testRedisson() throws IOException {
        System.out.println(redissonClient.getConfig().toJSON());
    }

    @Test
    public void testBloomFilter(){
        RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("bloomFilter_test"); //redis
        bloomFilter.tryInit(100000, 0.01); //HashKey空间大小,码重复率
        bloomFilter.add("a");
        bloomFilter.add("b");
        bloomFilter.add("c");
        System.out.println(bloomFilter.contains("a")); //TODO: 不能说明a存在!
        System.out.println(bloomFilter.contains("d")); //不存在!!
    }

    @Test
    public void testBloomFilter1(){
        //Key空间!!!
        RBloomFilter<Integer> bloomFilter = redissonClient.getBloomFilter("bloomFilter_test1"); //redis
        bloomFilter.tryInit(100, 0.01); //HashKey空间大小,码重复率
        for(int i = 1;i < 2000;i++)
            bloomFilter.add(i);
        System.out.println(bloomFilter.contains(1000));
        System.out.println(bloomFilter.contains(5000));
        //当空间太小,就会有大部分都是true的结果
    }
}

true不能说明问题,可能存在可能不存在。
false一定不存在。

发布订阅式主题

  • Redisson提供轻量的类似RabbitMQ的队列/主题功能。
  • RTopic:创建或获取主题
  • 监听者必须实现ApplicationRunner和Ordered接口
package redisson;

import dto.KnowledgeInfo;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RTopicPublisher {
    @Autowired
    private RedissonClient redissonClient;
    private String topicKey = "redissonTopicKey";

    public void sendMsg(KnowledgeInfo info){
        try {
            RTopic topic = redissonClient.getTopic(topicKey);
            topic.publishAsync(info);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
package redisson;

import dto.KnowledgeInfo;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

@Component
public class RTopicConsumer implements ApplicationRunner, Ordered {
    //构造Redisson客户端操作实例
    @Autowired
    private RedissonClient redissonClient;
    private String topicKey = "redissonTopicKey";
    @Override
    public void run(ApplicationArguments args) throws Exception {
        RTopic topic = redissonClient.getTopic(topicKey);
        //委托代理
        topic.addListener(KnowledgeInfo.class, new MessageListener<KnowledgeInfo>() {
            @Override
            public void onMessage(CharSequence charSequence, KnowledgeInfo dto) {
                //log.info("记录用户登录成功后的轨迹-消费者-监听消费到消息:{} ",dto);
                System.out.println(dto);
                //判断消息是否为null
                if (dto!=null){
                    //如果消息不为null,则将消息记录入数据库中
                    //TODO: save log
                }
            }
        });
    }

    @Override
    public int getOrder() { //规定启动顺序(优先级值)
        return 0;
    }
}
package com.example.service;

import dto.DeadInfo;
import dto.KnowledgeInfo;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.redisson.RedissonBloomFilter;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import redisson.RTopicPublisher;
import redisson.RedissonDelayQueuePublisher;

import java.io.IOException;

@SpringBootTest
public class RedissonTest {
    @Autowired(required = false)
    private RedissonClient redissonClient;
    
    @Autowired(required = false)
    private RTopicPublisher topicPublisher;
    @Test
    public void testRTopic(){
        topicPublisher.sendMsg(new KnowledgeInfo(1001, "aaaa", "bbb"));
        try{
            Thread.sleep(2000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

redisson RTopic 和 rabbitmq TopicExchange比较:

  1. 模型。前者用队列来模拟/或用缓存里的集合来模拟,没有日志。后者有日志,有死信队列,完整的。
  2. 消息发送。前者直接放入队列,后者放到exchange里面转存到队列。
    最好用rabbitmq,丢失有日志。

延迟队列

package com.example.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import dto.DeadInfo;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import rabbitmq.*;
import test.Message;

@SpringBootTest
public class RabbitMQTest {
    @Autowired
    private BasicPublisher basicPublisher;
    @Autowired
    private FanoutPublisher fanoutPublisher;
    @Test
    public void testMsg() throws JsonProcessingException {
        //单元测试
        //basicPublisher.sendMessage("message from test....");
        basicPublisher.sendMessage(new Message().setCode("002").setMsg("test......"));
        //TODO: 如果消息来不及处理,这里需要适当延时
    }

    @Test
    public void testFanoutMsg() throws JsonProcessingException {
        //单元测试
        fanoutPublisher.sendMsg(new Message().setCode("002").setMsg("test......"));
        //TODO: 如果消息来不及处理,这里需要适当延时
    }

    @Autowired
    private TopicPublisher topicPublisher;
    @Test
    public void testTopicMsg(){
        topicPublisher.sendMsgTopic("test msg from topic 1",
                "local.middleware.mq.topic.routing.tom.key"); //one, two
        topicPublisher.sendMsgTopic("test msg from topic 2",
                "local.middleware.mq.topic.routing.tom.jerry.key"); //two
        topicPublisher.sendMsgTopic("test msg from topic 3",
                "local.middleware.mq.topic.routing.key"); //two
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Autowired
    private KnowledgeManualPublisher knowledgeManualPublisher;
    @Test
    public void manualTest(){
        knowledgeManualPublisher.sendAutoMsg(new Message().setCode("002").setMsg("test......"));
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Autowired
    private DeadPublisher deadPublisher;
    @Test
    public void testDelayQueue(){
        deadPublisher.sendMsg(new DeadInfo(1001, "A"), 10);
        deadPublisher.sendMsg(new DeadInfo(1001, "B"), 5);
        try {
            Thread.sleep(15000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}    
package redisson;/**
 * Created by Administrator on 2019/5/2.
 */

import dto.DeadInfo;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * Redisson延迟队列消息模型-消费者
 * @Author:debug (SteadyJack)
 * @Date: 2019/5/2 17:11
 **/
@Component
@EnableScheduling
public class RedissonDelayQueueConsumer{
    //定义日志
    private static final Logger log= LoggerFactory.getLogger(RedissonDelayQueueConsumer.class);
    //定义Redisson的客户端操作实例
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 监听消费真正队列中的消息
     * 每时每刻都在不断的监听执行
     * @throws Exception
     */
    @Scheduled(cron = "*/1 * * * * ?")
    public void consumeMsg() throws Exception {
        //定义延迟队列的名称
        final String delayQueueName="redissonDelayQueueV3";
        RBlockingQueue<DeadInfo> rBlockingQueue=redissonClient.getBlockingQueue(delayQueueName);

        //从队列中弹出消息
        DeadInfo msg=rBlockingQueue.take();
        if (msg!=null){
            log.info("Redisson延迟队列消息模型-消费者-监听消费真正队列中的消息:{} ",msg);

            //TODO:在这里执行相应的业务逻辑
        }
    }
}
package redisson;/**
 * Created by Administrator on 2019/5/2.
 */

import dto.DeadInfo;
import org.redisson.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

/**
 * Redisson延迟队列消息模型-生产者
 * @Author:debug (SteadyJack)
 * @Date: 2019/5/2 17:10
 **/
@Component
public class RedissonDelayQueuePublisher {
    //定义日志
    private static final Logger log= LoggerFactory.getLogger(RedissonDelayQueuePublisher.class);
    //定义Redisson的客户端操作实例
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 发送消息入延迟队列
     * @param msg 消息
     * @param ttl 消息的存活时间-可以随意指定时间单位,在这里指毫秒
     */
    public void sendDelayMsg(final DeadInfo msg, final Long ttl){
        try {
            //定义延迟队列的名称
            final String delayQueueName="redissonDelayQueueV3";
            //定义获取阻塞式队列的实例
            RBlockingQueue<DeadInfo> rBlockingQueue=redissonClient.getBlockingQueue(delayQueueName);
            //定义获取延迟队列的实例
            RDelayedQueue<DeadInfo> rDelayedQueue=redissonClient.getDelayedQueue(rBlockingQueue);
            //往延迟队列发送消息-设置的TTL,相当于延迟了“阻塞队列”中消息的接收
            rDelayedQueue.offer(msg,ttl,TimeUnit.MILLISECONDS);

            log.info("Redisson延迟队列消息模型-生产者-发送消息入延迟队列-消息:{}",msg);
        }catch (Exception e){
            log.error("Redisson延迟队列消息模型-生产者-发送消息入延迟队列-发生异常:{}",msg,e.fillInStackTrace());
        }
    }
}
package com.example.service;

import dto.DeadInfo;
import dto.KnowledgeInfo;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.redisson.RedissonBloomFilter;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import redisson.RTopicPublisher;
import redisson.RedissonDelayQueuePublisher;

import java.io.IOException;

@SpringBootTest
public class RedissonTest {
    @Autowired(required = false)
    private RedissonClient redissonClient;

    @Autowired
    private RedissonDelayQueuePublisher delayQueuePublisher;
    @Test
    public void testRDelayedQueue(){
        delayQueuePublisher.sendDelayMsg(new DeadInfo(1000, "A"), 10000L);
        delayQueuePublisher.sendDelayMsg(new DeadInfo(1000, "B"), 5000L);
        try{
            Thread.sleep(15000);
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

rabbitMQ的队列虽然可以延迟,但是依然保留的队列的特性(即严格先进先出)(即不会自动扫描队列中元素的各自延迟时间)。
Redisson提供RDelayedQueue、RBIockingQueue作为延迟队列。
最好用rabbitmq,性能最高,因为不用扫描。

笔记8-springcloud

5个核心组件

  • 服务注册中心
    作用:服务注册、服务发现
    常用:Eureka、Zookeeper、Consul、Nacos
  • 负载均衡
    作用:集群分配策略、最优性能常用:Ribbon、Feign、OpenFeign等
  • 熔断降级
    作用:服务保护
    常用:Hystrix、Sentinel、Resilience4J等
  • 路由网关
    作用:过滤、身份验证等常用:zuul、gateway等
  • 配置中心
    作用:动态配置、统一管理
    常用:Spring Cloud Config、Nacos Config
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐