RabbitMQ-黑马商城为例

title: RabbitMQ-黑马商城为例
date: 2024-06-22 20:37:17
tags: RabbitMQ

1.启动RabbitMQ

基于Docker来安装RabbitMQ,命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
docker run 
-e RABBITMQ_DEFAULT_USER=itheima #设置默认用户名
-e RABBITMQ_DEFAULT_PASS=123456 #设置默认密码
-v mq-plugins:/plugins #将本地主机上的mq-plugins目录挂载到容器内部的/plugins目录,可以存放插件
--name mq #指定容器名
--hostname mq #指定容器的主机名
-p 15672:15672 #RabbitMQ管理页面登录的端口号 [浏览器输入http://localhost:15672/即可进入]
-p 5672:5672 #RabbitMQ用于AMQP协议通信 [SpringAMQP配置时候用]
--network heima #将容器连接到名字为heima的网络中 [如果没有就使用命令创建hmall网络 docker network create heima]
-d #在后台运行容器
rabbitmq:3.8-management #使用RabbitMQ 3.8版本带有管理界面的镜像来创建容器


精简版 --直接在虚拟机上启动docker然后docker run
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network heima\
-d \
rabbitmq:3.8-management

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

通过访问 http://localhost:15672或者http://192.168.92.129:15672即可看到本地/服务器上的管理控制台。首次访问登录,需要配置文件中设定的用户名和密码

image-20240319192803935

创建hmall用户,并且配置一个hmall2虚拟空间

image-20240623002616496

2.操作步骤

  • 1.pom.xml中引入AMQP依赖:消费者和生产者项目

  • 2.yml文件中配置RabbitMQ信息:

    • 2.1消费者项目【基础配置,消费者重试机制,消费者确认机制】
    • 2.2生产者项目【基础配置,生产者重试机制,生产者确认机制】
  • 3.发送消息:生产者利用RabbitTemplate.convertAndSend(exchange交换机, routingKey路由key,message消息【传递的字段】(.setDelay设置延迟时间),confirm消息确认机制信息);

    • 3.1 message默认是JDK序列化有一堆问题 –>引入Jackson序列化【①引入依赖,②生产者和消费者的启动类添加@Bean注入】
  • 4.接收消息:消费者在方法上添加@RabbitListener注解

    具体就是@RabbitListener(bindings=@QueueBinding(

    ​ value=@Queue(name=队列名,durable=true持久化,惰性队列arguments = @Argument(name=”x-queue-mode”,value = “lazy”)),

    ​ exchange=@Exchange(name=交换机名,type = ExchangeTypes.TOPIC,delayed=”true”延迟属性),

    ​ key={“绑定条件1”,”绑定条件2”}

    ​ ))

    方法(原来传递的字段){

    ​ //里面写的就是之前直接调用的那个方法(serviceimpl层代码)

    }

3.更改余额支付需求

改造余额支付功能,将支付成功后基于OpenFeign的交易服务的更新订单状态接口的同步调用—–>基于RabbitMQ的异步通知

image-20240622222844704

说明:目前没有通知服务和积分服务,因此我们只关注交易服务,步骤如下:

  • 定义direct类型交换机,命名为pay.direct
  • 定义消息队列,命名为trade.pay.success.queue
  • trade.pay.success.queuepay.direct绑定,BindingKeypay.success
  • 支付成功时不再调用交易服务更新订单状态的接口,而是发送一条消息到pay.direct,发送消息的RoutingKeypay.success,消息内容是订单id
  • 交易服务监听trade.pay.success.queue队列,接收到消息后更新订单状态为已支付

分析:

  • 生产者:支付服务pay-service

  • 消费者:交易服务trade-service

3.1 pom.xml导入依赖

在生产者和消费者的pom.xml文件中配置:

1
2
3
4
5
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 yml配置RabbitMQ信息

3.2.1 简单配置

在生产者和消费者的application.yml文件中配置:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: 192.168.92.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall2 # 虚拟主机
username: hmall # 用户名
password: 123456 # 密码

//消费者和生产者会在对应位置添加配置 【例如:生产者消费者的确认机制,重试机制等】

3.2.2 nacos统一配置管理

  • 将rabbitmq配置放在nacos平台:【如果使用统一配置管理,记得导入对应nacos统一配置的config依赖和读取bootstrap.yml文件依赖】

image-20240623002711143

  • bootstrap.yml添加读取nacos配置
image-20240622224211688

3.3 支付服务–发送消息

3.3.1 修改原来业务

image-20240622230216876

3.3.2 配置Jackson消息转换器

  • 导入依赖:

image-20240622232241693

  • 直接配置到hm-common微服务下:
image-20240622231633000
  • 因为要考虑trade-service和pay-service调用时候springboot扫描问题:
image-20240622232005648
  • 然后在生产者和消费者启动类添加bean注入:
image-20240623124959823

3.4 交易服务–接受消息

在trade-service服务中定义一个消息监听类,方法外用注解标注队列,交换机和路由key,方法内写之前调用的方法:

image-20240622233402122

3.5 测试

3.5.1 重启两个服务

可以通过hmall用户的hmall虚拟主机看到队列:

image-20240623003731861

可以通过hmall用户的hmall虚拟主机看到交换机:

image-20240623003857226

3.5.2 前端下单

前端下单然后支付成功之后,查看数据库信息变化了,并且有一条消息进入到mq之中。

image-20240623004203477

4.更改清除购物车需求

==这个需求参考3步骤做的,以下只介绍生产者和消费者部分代码修改==

4.1 订单服务–发送消息

image-20240623135513712

4.2 购物车服务–接收消息

image-20240623135542378

5.改造代码总结

原来的设计:我在方法位置直接调用tradeClient的方法
现在的设计:①生产者只需要传递原来的参数和声明交换机名和key路由;②消费者需要声明交换机名,key路由和队列名,在方法里面直接调用底层方法(serviceimpl层方法),就不用像openFeign方式。

image-20240623125707127

Jmeter

1.安装Jmeter

Jmeter依赖于JDK,所以必须确保当前计算机上已经安装了JDK,并且配置了环境变量。

1.1.下载

可以Apache Jmeter官网下载,地址:http://jmeter.apache.org/download_jmeter.cgi

image-20240620133703234

1.2.解压

因为下载的是zip包,解压缩即可使用,目录结构如下:

image-20240620133725523

其中的bin目录就是执行的脚本,其中包含启动脚本:

image-20240620133802462

1.3.运行

双击即可运行,但是有两点注意:

  • 启动速度比较慢,要耐心等待
  • 启动后黑窗口不能关闭,否则Jmeter也跟着关闭了

image-20240620133825276

2.快速入门

2.1.设置中文语言

默认Jmeter的语言是英文,需要设置:

  • ==设置本地运行中文==

image-20240620133838529

效果:

image-20240620133844456

注意:上面的配置只能保证本次运行是中文,如果要永久中文,需要修改Jmeter的配置文件

  • ==设置永久中文==

打开jmeter文件夹,在bin目录中找到 jmeter.properties,添加下面配置:

1
language=zh_CN

image-20240620133857758

注意:前面不要出现#,#代表注释,另外这里是下划线,不是中划线

2.2.基本用法

在测试计划上点鼠标右键,选择添加 > 线程(用户) > 线程组:

image-20240620134023118

在新增的线程组中,填写线程信息:

image-20240620134032791

给线程组点鼠标右键,添加http取样器:

image-20240620134051379

编写取样器内容:

image-20240620134057894

添加监听报告:

image-20240620134103715

添加监听结果树:

image-20240620134118963

汇总报告结果:

image-20240620134130039

结果树:

image-20240620134154569

微服务-黑马商城为例

前提:我们以单体架构的黑马商城为例

image-20240528142451641

代码结构如下:

image-20240528142611395

==服务拆分–各个模块各司其职==

1.微服务拆分

拆分工程结构有两种:

  • 1.独立project:总黑马商城设置一个空项目(各个模块都在这个目录下) –不怎么美观和使用
  • 2.Maven聚合:总黑马商城设置一个空项目(各个模块成为一个module模块,根据maven管理) –只是代码放一起但是各自可以打包开发编译

我们以第二种Maven聚合方式进行拆分

1.1 新建项目

image-20240528165608489

1.2 导入依赖

直接从hm-service中导入,然后删除一些不需要的依赖

1.3 编写启动类

一定记得和其他包是同一级,不然他妈的扫描不到报bean冲突!!!!!

image-20240528165703436

1.4 编写yml配置文件

直接从hm-service中导入,然后删除和修改一些配置

1.5 挪动代码

挪动步骤:

①domain实体,

②mapper数据库打交道的,

③service和serviceimpl,

④controller

==在这一步拆分多个子项目之后,我们可能会发现cart购物车服务会调用查询item商品服务,之前我们可以在一个模块中直接调用mapper,但是分开之后只能发送请求访问==

2.远程调用-RestTemplate

之前通过调用item的mapper层方法即可,现在需要通过RestTemplate发送http请求给item服务获取数据。【但是有个致命问题是,exchange方法的url是写死的就很麻烦】

使用方法:

image-20240529110754747

具体操作:

image-20240529110418958

==服务治理–更高效管理调用者和被调用者==

1.注册中心(+调用中间商)

为了解决RestTemplate发送http请求时会写死url问题【如果被调用服务有多台负载均衡,就会报错更改也很麻烦】。==其实注册中心就相当于docker中的数据卷一样,我们可以当做中间商然后把调用者(服务调用者)和被调用者(服务注册者)联系起来。==

1.1 注册中心原理

流程如下:

  • 服务启动时就会注册自己的服务信息(服务名、IP、端口)到注册中心 –让注册中心知道我可以被调用
  • 调用者可以从注册中心订阅想要的服务,获取服务对应的实例列表(1个服务可能多实例部署) –让调用者知道有哪些可以调用
  • 调用者自己对实例列表负载均衡,挑选一个实例 –让调用者选一个被调用者
  • 调用者向该实例发起远程调用 –远程调用

image-20240529171431457

  • 服务治理中的三个角色分别是什么?

​ 服务提供者:暴露服务接口,供其它服务调用

​ 服务消费者:调用其它服务提供的接口

​ 注册中心:记录并监控微服务各实例状态,推送服务变更信息

  • 消费者如何知道提供者的地址?

​ 服务提供者会在启动时注册自己信息到注册中心,消费者可以从注册中心订阅和拉取服务信息

  • 消费者如何得知服务状态变更?【Nacos会15s检测一次,30s删除一次

​ 服务提供者通过心跳机制向注册中心报告自己的健康状态,当心跳异常时注册中心会将异常服务剔除,并通知订阅了该服务的消费者

  • 当提供者有多个实例时,消费者该选择哪一个?

​ 消费者通过负载均衡算法,从多个实例中选择一个【==以前SpringMVC默认是Ribbon负载均衡,后来默认是loadbalancer负载均衡==】

1.2 注册中心方式

1.1.1 Eureka(之前使用)

具体使用可以去SpringCloud篇笔记查找

1.1.2 Nacos(目前使用)

1.角色1-注册中心

  • 1.准备配置文件和tar包

    image-20240531172545922
  • 2.linux服务器docker容器启动

    image-20240530095352569

  • 3.可以在windows系统下访问

image-20240530095505769

2.角色2-服务注册

主要用于对服务提供者进行信息注册,注册到nacos中。

  • 1.在pom.xml中导入依赖和在application.yml文件中配置nacos地址

image-20240530095103394

  • 2.我们添加完成之后可以刷新nacos地址,就可以在网页中看到

image-20240530095604593

3.角色3-服务发现

  • 1.在pom.xml中导入依赖和在application.yml文件中配置nacos地址

    image-20240601161204592

    Nacos的依赖于服务注册时一致,这个依赖中同时包含了服务注册和发现的功能。因为任何一个微服务都可以调用别人,也可以被别人调用,即可以是调用者,也可以是提供者。

  • 2.我们添加完成之后可以刷新nacos地址,就可以在网页中看到

image-20240531173131580

  • 3.进行远程调用

==服务调用–更高效发送http请求==

1.OpenFeign(优化发送http请求)

之前使用的RestTemplate发起远程调用的代码:

image-20240423202621703

存在下面的问题:

• 代码可读性差,编程体验不统一

• 参数复杂URL难以维护

==Feign==是一个声明式的http客户端。其作用是帮助我们优雅地实现http请求发送,解决了上述的问题

1.1 使用步骤

  • 1.导入依赖

image-20240601164647179

  • 2.服务发现方启动类添加注解

image-20240601164613950

  • 3.服务发现方编写接口

image-20240601165533941

这里只需要声明接口,无需实现方法[OpenFeign动态代理实现]。接口中的几个关键信息:

  • @FeignClient("item-service") :声明服务名称
  • @GetMapping :声明请求方式
  • @GetMapping("/items") :声明请求路径
  • @RequestParam("ids") Collection<Long> ids :声明请求参数
  • List<ItemDTO> :返回值类型

有了上述信息,OpenFeign就可以利用动态代理帮我们实现这个方法,并且向http://item-service/items发送一个GET请求,携带ids为请求参数,并自动将返回值处理为List<ItemDTO>。我们只需要直接调用这个方法,即可实现远程调用了。

  • 4.服务发现方直接远程调用
    image-20240601165127358

总而言之,OpenFeign替我们完成了服务拉取、负载均衡、发送http请求的所有工作

1.2 连接池

==Feign底层发起http请求,依赖于其它的框架==。其底层客户端实现包括:

  • URLConnection:[默认]不支持连接池

  • Apache HttpClient :支持连接池

  • OKHttp:支持连接池

以HttpClient为例:

①pom.xml文件引入依赖

1
2
3
4
5
<!--httpClient的依赖 -->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>

②yml配置文件

1
2
3
4
5
6
feign:
httpclient:
enabled: true # 开启feign对HttpClient的支持
#线程池的核心值需要压测和实际情况调整!!!!!!!!!!!1
max-connections: 200 # 最大的连接数
max-connections-per-route: 50 # 每个路径的最大连接数

1.3 最佳实践方案

我们在2.1的使用步骤其实只是模拟了一种调用,但可能多个模块之间互相调用这种方式就有很大弊端。

因此可以提出继承方式和抽取方式:

image-20240601205026514

方案1抽取更加简单,工程结构也比较清晰,但缺点是整个项目耦合度偏高。

方案2抽取相对麻烦,工程结构相对更复杂,但服务之间耦合度降低。

1.3.1 两种抽取方式

1.继承方式

就是将所有用得到的dto,po,vo啥的都放到一个微服务里面。

image-20240601204832364

2.抽取方式

每个微服务存放自己需要的dto,po,vo啥的。只有需要的放到对应微服务。

image-20240601204850644

1.3.2 抽取Feign客户端

就是将cart-service关于调用的代码和vo,dto等挪到hm-api公共模块内。

1.3.3 扫描包

一般情况下,如果调用feign和注册feign不在一个微服务内,那就可能出现扫描包扫描不到报错。就需要进行设置扫描包:

image-20240601204312798

1.4 日志管理

OpenFeign只会在FeignClient所在包的日志级别为DEBUG时,才会输出日志。而且其日志级别有4级:

  • NONE:不记录任何日志信息,这是默认值。
  • BASIC:仅记录请求的方法,URL以及响应状态码和执行时间
  • HEADERS:在BASIC的基础上,额外记录了请求和响应的头信息
  • FULL:记录所有请求和响应的明细,包括头信息、请求体、元数据。

Feign默认的日志级别就是NONE,所以默认我们看不到请求日志。

1.4.1 配置文件yml方式

image-20240423213829442

1.4.2 Java代码方式

image-20240423214701673

提出一些问题:

我们将黑马商城拆分为5个微服务:

  • 用户服务
  • 商品服务
  • 购物车服务
  • 交易服务
  • 支付服务

由于每个微服务都有不同的地址或端口,入口不同,在与前端联调的时候发现了一些问题:

  • 请求不同数据时要访问不同的入口,需要维护多个入口地址,麻烦
  • 前端无法调用nacos,无法实时更新服务列表

单体架构时我们只需要完成一次用户登录、身份校验,就可以在所有业务中获取到用户信息。而微服务拆分后,每个微服务都独立部署,这就存在一些问题:

  • 每个微服务都需要编写登录校验、用户信息获取的功能吗?
  • 当微服务之间调用时,该如何传递用户信息?

通过==网关==技术解决上述问题。笔记分为3章:

  • 第一章:网关路由,解决前端请求入口的问题。
  • 第二章:网关鉴权,解决统一登录校验和用户信息获取的问题。
  • 第三章:统一配置管理,解决微服务的配置文件重复和配置热更新问题。

==服务管理–帮助前后端联调,全局门卫==

1.网关路由

1.1 网关概述(门卫)

顾明思议,网关就是网络的==关口==。数据在网络间传输,当一个网络 –传输–> 另一网络时,就需要经过网关来做数据的路由转发数据安全的校验

image-20240606172320142

现在,微服务网关就起到同样的作用。前端请求不能直接访问微服务,而是要请求网关:

  • 网关可以做安全控制,也就是登录身份校验,校验通过才放行
  • 通过认证后,网关再根据请求转发到想要访问的微服务

image-20240606172632286

在SpringCloud当中,提供了两种网关实现方案:

  • Netflix Zuul:早期实现,目前已经淘汰
  • SpringCloudGateway:基于Spring的WebFlux技术,完全支持响应式编程,吞吐能力更强

1.2 在项目中的地位

image-20240604172940613

1.3 快速入门

1.3.1 创建项目

创建一个微服务hm-gateway项目:

image-20240606173445134

1.3.2 引入依赖

pom.xml文件引入依赖:

image-20240606173435981

1.3.3 启动类

创建启动类【一定要注意启动类位置和其他包在同一级,不然启动类扫描注解就报错】:

image-20240618110428918

1.3.4 配置路由

==(目前最全,直接挪进去改改)==

接下来,在hm-gateway模块的resources目录新建一个application.yaml文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#端口信息
server:
port: 8087
#spring配置
spring:
application:
name: gateway #微服务名称(用于nacos微服务注册)
cloud:
nacos:
server-addr: 192.168.92.129:8848 #微服务nacos地址
#路由过滤
gateway:
#1.路由过滤
routes:
#第一个微服务
- id: item # 路由规则id,自定义,唯一
uri: lb://item-service # 路由的目标服务,lb代表负载均衡,会从注册中心拉取服务列表
predicates: # 路由断言,判断当前请求是否符合当前规则,符合则路由到目标服务
- Path=/items/**,/search/** # 这里是以请求路径作为判断规则
#第二个微服务
- id: cart
uri: lb://cart-service
predicates:
- Path=/carts/**
#第三个微服务
- id: user
uri: lb://user-service
predicates:
- Path=/users/**,/addresses/**
#第四个微服务
- id: trade
uri: lb://trade-service
predicates:
- Path=/orders/**
#第五个微服务
- id: pay
uri: lb://pay-service
predicates:
- Path=/pay-orders/**

#2.默认过滤器
default-filters: # 默认过滤项
- AddRequestHeader=Truth,Itcast is freaking awesome!

#3.跨域问题
globalcors:
add-to-simple-url-handler-mapping: true #解决options请求被拦截问题
cors-configurations:
'[/**]': #拦截一切请求
allowedOrigins: # 允许哪些网站的跨域请求
- "http://localhost:8090"
allowedMethods: # 允许的跨域ajax的请求方式
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders: "*" # 允许在请求中携带的头信息
allowCredentials: true # 是否允许携带cookie
maxAge: 360000 # 这次跨域检测的有效期

==配置文件概述:==

其中,路由规则的定义语法如下:

1
2
3
4
5
6
7
8
spring:
cloud:
gateway:
routes:
- id: item
uri: lb://item-service
predicates:
- Path=/items/**,/search/**

四个属性含义如下:

  • id:路由的唯一标示
  • predicates:路由断言【判断是否符合条件】 –>十一种,但是只用Path这一类
  • filters:路由过滤条件【请求时添加信息】 –>三大类过滤器(执行顺序:默认过滤器,路由过滤器,全局过滤器)
  • uri:路由目标地址,lb://代表负载均衡,从注册中心获取目标微服务的实例列表,并且负载均衡选择一个访问。

其中yml配置中的routes可以查看源码(底层其实就是我们配置的6个属性,其中我们常用其中4个):
image-20240607145613009

1.3.5 测试

image-20240607111543349

2.网关鉴权(+登录校验)

  • 单体架构,我们只需要完成一次用户登录,身份校验就可以在所有业务中获取到用户信息。
  • 微服务架构,每个微服务都需要做用户登录校验就不太合理了

2.1 鉴权思路分析

我们的登录是基于JWT来实现的,校验JWT的算法复杂,而且需要用到秘钥。如果每个微服务都去做登录校验,这就存在着两大问题:

  • 每个微服务都需要知道JWT的秘钥,×不安全
  • 每个微服务重复编写登录校验代码、权限校验代码,×麻烦

既然网关是所有微服务的入口,一切请求都需要先经过网关。我们完全可以把登录校验的工作放到网关去做,这样之前说的问题就解决了:

  • 只需要在网关和用户服务保存秘钥
  • 只需要在网关开发登录校验功能

【顺序:登录校验 –> 请求转发到微服务】

image-20240618111909518

因此,①JWT登录校验 —->② 网关请求转发(gateway内部代码实现)

2.2 Gateway内部工作基本原理

登录校验必须在请求转发到微服务之前做,否则就失去了意义。而网关的请求转发是Gateway内部代码实现的,要想在请求转发之前做登录校验,就必须了解Gateway内部工作的基本原理。

image-20240607151254092

如图所示:

  1. 客户端请求进入网关后由HandlerMapping对请求做判断,找到与当前请求匹配的路由规则(Route),然后将请求交给WebHandler去处理。
  2. WebHandler则会加载当前路由下需要执行的过滤器链(Filter chain),然后按照顺序逐一执行过滤器(后面称为Filter)。
  3. 图中Filter被虚线分为左右两部分,是因为Filter内部的逻辑分为prepost两部分,分别会在请求路由到微服务之前之后被执行。
  4. 只有所有Filterpre逻辑都依次顺序执行通过后,请求才会被路由到微服务。
  5. 微服务返回结果后,再倒序执行Filterpost逻辑。
  6. 最终把响应结果返回。

==总结:==

image-20240618134038219

如图所示,最终请求转发是有一个名为NettyRoutingFilter的过滤器来执行的,而且这个过滤器是整个过滤器链中顺序最靠后的一个。

如果我们能够定义一个过滤器,在其中实现登录校验逻辑,并且将过滤器执行顺序定义到NettyRoutingFilter之前,这就符合我们的需求。

2.3 网关过滤链-三种过滤器

网关过滤器链中的过滤器有两种:

  • GatewayFilter路由过滤器(gateway自带),作用范围比较灵活,可以:【指定的路由Route】 –一般自定义的话比较麻烦【直接yml配置】
  • GlobalFilter全局过滤器,作用范围:【所有路由】,不可配置。 –一般使用这个好弄
  • HttpHeadersFilter处理传递到下游微服务的请求头

其实GatewayFilterGlobalFilter这两种过滤器的方法签名完全一致:

1
2
3
4
5
6
7
/**
* 处理请求并将其传递给下一个过滤器
* @param exchange 当前请求的上下文,其中包含request、response等各种数据
* @param chain 过滤器链,基于它向下传递请求
* @return 根据返回值标记当前请求是否被完成或拦截,chain.filter(exchange)就放行了。
*/
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);

工作基本原理的第二步WebHandler:FilteringWebHandler请求处理器在处理请求时,会将②GlobalFilter装饰为①GatewayFilter,然后放到同一个过滤器链中,排序以后依次执行。

2.4 自定义过滤器

2.4.1 GatewayFilter

Gateway内置的GatewayFilter过滤器使用起来非常简单,无需编码,只要在yaml文件中简单配置即可。而且其作用范围也很灵活,配置在哪个Route下,就作用于哪个Route

方式一-yml文件配置

例如,有一个过滤器叫做AddRequestHeaderGatewayFilterFacotry,顾明思议,就是添加请求头的过滤器,可以给请求添加一个请求头并传递到下游微服务。

使用只需要在application.yaml中这样配置:【配置到gateway-routes下面就表明属于一个route】

1
2
3
4
5
6
7
8
9
10
11
spring:
cloud:
gateway:
routes:
- id: test_route
uri: lb://test-service
predicates:
-Path=/test/**
#过滤器
filters:
- AddRequestHeader=key, value # 逗号之前是请求头的key,逗号之后是value

如果想作用于全部路由,则可以配置:【配置到gateway下面就表明不属于任何一个route,属于全部路由】

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
cloud:
gateway:
routes:
#在这里配置只在部分route下有效
- id: test_route
uri: lb://test-service
predicates:
-Path=/test/**

#默认过滤器【全部路由】
default-filters: # default-filters下的过滤器可以作用于所有路由
- AddRequestHeader=key, value

方式二-自定义类

自定义GatewayFilter不是直接实现GatewayFilter,而是实现AbstractGatewayFilterFactory

  • 第一种:参数yml配置+自定义过滤器

【注意:该类的名称一定要以GatewayFilterFactory为后缀!】

image-20240618135158605

然后在yml配置中使用:

1
2
3
4
5
spring:
cloud:
gateway:
default-filters:
- PrintAny #直接写自定义GatewayFilterFactory类名称中前缀类声明过滤器
  • 第二种:自定义过滤器+动态配置参数【比较复杂不建议】

image-20240607153516182

然后在yml配置中使用:

1
2
3
4
5
spring:
cloud:
gateway:
default-filters:
- PrintAny=1,2,3 # 注意,这里多个参数以","隔开,将来会按照shortcutFieldOrder()方法返回的参数顺序依次复制

上面这种配置方式参数必须严格按照shortcutFieldOrder()方法的返回参数名顺序来赋值。

还有一种用法,无需按照这个顺序,就是手动指定参数名:

1
2
3
4
5
6
7
8
9
spring:
cloud:
gateway:
default-filters:
- name: PrintAny
args: # 手动指定参数名,无需按照参数顺序
a: 1
b: 2
c: 3

第二种方法的总体图对比:

image-20240607154320369

2.4.2 GlobalFilter

自定义GlobalFilter则简单很多,直接实现GlobalFilter即可,而且也无法设置动态参数[因为默认是全局路由]:

image-20240607153823420

2.5 问题一-怎么进行登录校验

现在我们知道可以通过定义两种过滤器,定义到NettyRoutingFilter之前就行。

我们以自定义GlobalFilter来完成登录校验:

image-20240610213352568

完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.hmall.gateway.filter;
import com.hmall.common.exception.UnauthorizedException;
import com.hmall.gateway.config.AuthProperties;
import com.hmall.gateway.util.JwtTool;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.List;
@Component
@RequiredArgsConstructor
@EnableConfigurationProperties(AuthProperties.class)
public class AuthGlobalFilter implements GlobalFilter, Ordered {

private final JwtTool jwtTool;

private final AuthProperties authProperties;
//因为不需要拦截的路径有/** 所以我们使用这种特殊matcher类进行匹配
private final AntPathMatcher antPathMatcher = new AntPathMatcher();

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 1.获取Request
ServerHttpRequest request = exchange.getRequest();
// 2.判断是否不需要拦截
if(isExclude(request.getPath().toString())){ //yml配置的不需要拦截的路径和request的路径进行判断
// 无需拦截,直接放行
return chain.filter(exchange);
}
// 3.获取请求头中的token
String token = null;
List<String> headers = request.getHeaders().get("authorization");
if (headers!=null && !headers.isEmpty()) {
token = headers.get(0);
}
// 4.校验并解析token
Long userId = null;
try {
userId = jwtTool.parseToken(token);
} catch (UnauthorizedException e) {
// 如果无效,拦截
ServerHttpResponse response = exchange.getResponse();
response.setRawStatusCode(401);
return response.setComplete();
}
// TODO 5.如果有效,传递用户信息
System.out.println("userId = " + userId);
// 6.放行
return chain.filter(exchange);
}

private boolean isExclude(String antPath) {
for (String pathPattern : authProperties.getExcludePaths()) {
if(antPathMatcher.match(pathPattern, antPath)){
return true;
}
}
return false;
}

@Override
public int getOrder() {
return 0;
}
}

2.6 问题二-网关怎么传递用户信息

截止到2.5,网关已经可以完成登录校验并获取登录用户身份信息。

但是当网关将请求转发到微服务时,微服务又该如何获取用户身份呢?由于网关发送请求到微服务依然采用的是Http请求,因此我们可以将用户信息以请求头的方式传递到下游微服务。然后微服务可以从请求头中获取登录用户信息。考虑到微服务内部可能很多地方都需要用到登录用户信息,因此我们可以利用SpringMVC的拦截器来获取登录用户信息,并存入ThreadLocal,方便后续使用。

据图流程图如下:

image-20240610213950132

2.6.1 网关如何转发用户信息

网关发送请求到微服务依然采用的是Http请求,因此我们可以将用户信息以请求头的方式传递到下游微服务。

具体操作:【在2.5校验器实现的登录校验里面将jwt解析出来的UserId以请求头方式传递】

image-20240618152159108

2.6.2 下游微服务怎么获取用户信息

微服务可以从请求头中获取登录用户信息。利用SpringMVC的拦截器来获取登录用户信息,并存入ThreadLocal,方便后续使用。

据图流程图如下:【==编写微服务拦截器,拦截请求获取用户信息,保存到ThreadLocal后放行==】

image-20240618161828959

整体代码结构:

image-20240618162921217

具体操作:

因为当前用户ID会在多个微服务中使用,所以我们可以在hm-common微服务中编写:

  • 1.根据SpringMvc拦截器创建规则创建自定义拦截器

image-20240618160956852

  • 2.创建MvcConfig添加自定义的拦截器

image-20240618161119070

  • 3.可以修改之前写死的位置业务逻辑,这样可以在通过Threadlocal获取信息

  • 4.需要注意的是:因为是写在hm-common微服务,这个配置类默认不会生效(和其他微服务的扫描包不一致,无法扫描到,因此无法生效)。基于SpringBoot自动装配原理,我们可以将其添加到resources目录下的META-INF/Spring.factories文件中:

  • 5.如果我们需要保证其他微服务获取这个拦截器,而网关不获取(登录校验了,所以没必要获取啊),就可以添加注解

image-20240618162712521

2.7 问题三-微服务之间怎么传递用户信息

前端发起的请求都会经过网关再到微服务,由于我们之前编写的过滤器和拦截器功能,微服务可以轻松获取登录用户信息。

但有些业务是比较复杂的,请求到达微服务后还需要调用其它多个微服务。

比如下单业务,流程如下:

image-20240618163838037

下单的过程中,需要调用商品服务扣减库存,调用购物车服务清理用户购物车。而清理购物车时必须知道当前登录的用户身份。但是,订单服务调用购物车时并没有传递用户信息,购物车服务无法知道当前用户是谁!

由于微服务获取用户信息是通过拦截器在请求头中读取,因此要想实现微服务之间的用户信息传递,就必须在微服务发起调用时把用户信息存入请求头

微服务之间调用是基于OpenFeign来实现的,并不是我们自己发送的请求。我们如何才能让每一个由OpenFeign发起的请求自动携带登录用户信息呢?–借助Feign中提供的一个拦截器接口:RequestInterceptor

image-20240619142520506

我们只需要==实现这个接口,然后实现apply方法,利用RequestTemplate类来添加请求头,将用户信息保存到请求头中==。这样以来,每次OpenFeign发起请求的时候都会调用该方法,传递用户信息。

具体实现:

image-20240619142843772

这样注入bean之后如果要使用,就要在Openfeign远程调用的启动类添加:

image-20240619143047728

==总结:网关解决传递信息的三大问题==

  • 1.怎么做到先校验?后转发(网关路由是配置的,请求转发是Gateway内部代码) —在gateway内部工作基本原理的NettyRoutingFilter过滤器前面定义一个过滤器(①路由过滤器②全局过滤器),过滤器中进行校验JWT信息,然后通过mutate方法转发用户信息。
  • 2.怎么做到网关给用户传递用户信息 —网关到微服务通过API添加用户信息到http请求头,微服务通过SpringMVC拦截器获取用户信息,将用户信息存储到ThreadLocal中
  • 3.怎么做到用户之间调用传递用户信息 —就是利用发送http请求(Openfeign)时通过提供的拦截器添加

image-20240619143917520

[JWT里面传递UserId信息,网关添加过滤器进行校验token同时将UserId添加到请求头,通过mutate方法传递给微服务,微服务通过SpringMVC拦截器获取UserId信息,然后存储到ThreadLocal,业务就可以使用。如果微服务之间调用就通过OpenFeign发送http请求的时候添加拦截器保存UserId]

==配置管理–高效维护配置和动态变更属性==

1.微服务重复配置过多,维护成本高 —-> 共享配置

2.业务配置经常变动,每次修改都要重启服务 —-> 热更新

3.网关路由配置写死,如果变更就要重启网关 —-> 热更新

image-20240619145505779

这些问题都可以通过统一的配置管理器服务[Nacos第二大特性]解决 —–Nacos不仅仅具备注册中心功能,也具备配置管理的功能:

微服务共享的配置可以统一交给Nacos保存和管理,在Nacos控制台修改配置后,Nacos会将配置变更推送给相关的微服务,并且无需重启即可生效,实现配置热更新。

网关的路由同样是配置,因此同样可以基于这个功能实现动态路由功能,无需重启网关即可修改路由配置。

1.配置共享

我们可以把微服务共享的配置抽取到Nacos中统一管理,这样就不需要每个微服务都重复配置了。分为两步:

  • ①在Nacos中添加共享配置
  • ②微服务拉取配置

1.1 添加共享配置

在nacos控制台分别添加微服务共同配置:

image-20240619153300369

最终形成多个yaml文档:

image-20240619153352401

1.2 拉取共享配置

将拉取到的共享配置与本地的application.yaml配置合并,完成项目上下文的初始化。

不过,需要注意的是,读取Nacos配置是SpringCloud上下文(ApplicationContext)初始化时处理的,发生在项目的引导阶段。然后才会初始化SpringBoot上下文,去读取application.yaml

也就是说引导阶段,application.yaml文件尚未读取,根本不知道nacos 地址,该如何去加载nacos中的配置文件呢?

SpringCloud在初始化上下文的时候会先读取一个名为bootstrap.yaml(或者bootstrap.properties)的文件,如果我们将nacos地址配置到bootstrap.yaml中,那么在项目引导阶段就可以读取nacos中的配置了。

1.2.1 文件读取顺序

image-20240619154015718

1.2.2 拉取步骤

  • 1.导入依赖:
1
2
3
4
5
6
7
8
9
10
<!--nacos配置管理-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>

image-20240619154146436

  • 2.编写bootstrap文件:

image-20240619154311986

1.3 多配置文件读取顺序

可能不同环境下有不同的yaml文件[像单体架构的时候properties,yml,yaml等情况],因此当出现相同属性时就有优先级:==名字越长越牛逼==

image-20240423173524235

1.4 配置共享整理总结

其实就是把原来的application.yml文件拆分成三个部分:①application公共配置;②Nacos地址和读取①文件配置;③application个性化配置

①nacos空间多个共享文件:原来application.yml中多个微服务可共享的信息

②新建bootstrap.yml文件:原来application.yml里面关于nacos的配置+添加config信息(读取nacos配置的多个共同部分yml文件);

③application.yml:保留一部分自己特有的属性和①nacos里面${}需要的属性

2.配置热更新(无需重启)

这就要用到Nacos的配置热更新能力了,分为两步:

  • 在Nacos中添加配置[配置属性]
  • 在微服务读取配置[bootstrap.yml文件拉取配置,具体业务位置使用]

image-20240619160718950

2.1 Nacos配置文件

首先,我们在nacos中添加一个配置文件,将购物车的上限数量添加到配置中:

image-20240619160940082

注意文件的dataId格式:

1
[服务名]-[spring.active.profile].[后缀名]

文件名称由三部分组成:

  • 服务名:我们是购物车服务,所以是cart-service
  • spring.active.profile:就是spring boot中的spring.active.profile,可以省略,则所有profile共享该配置
  • 后缀名:例如yaml

2.2 配置热更新

我们在微服务中读取配置,实现配置热更新。【一般我们使用第一种方式,第二种要用两个注解】

现在我们需要读取Nacos配置文件中的信息hm.cart.maxAmount属性:

image-20240619161955080

2.2.1 方式一

cart-service中新建一个属性读取类:

image-20240619161154107

接着,在业务中使用该属性加载类:

image-20240619161245631

2.2.2 方式二

直接搭配@RefreshScope注解和@Value注解获取

image-20240619161914727

3.动态路由

用到了在学

Hexo博客报错github传输大文件GH001异常

1.报错原因

我在Docker文件夹下上传了一个iso文件,这个文件大于了github的100M大小报错。

在我hexo g的时候没问题,但是hexo d的时候会出错。

image-20240528105520819

但是本地删除了iso文件还是不行,最后查询意思是之前的记录仍然存在,只能从本地仓库删除并且把以前的提交记录全部修改

2.修改办法

2.1 在此目录下打开git bash

image-20240528105706115

2.2 输入指令 git log通过此处找到报错前最新的版本

image-20240528105915849

2.2 还有一种办法就是通过github查看版本

image-20240528110139383

2.3 至此直接git reset id 就可以恢复到对应版本

image-20240528110223842

3.参考办法

记一次异常艰难的博客部署(二)—— hexo d 指令向GitHub传输大文件导致的 GH001 报错解决 | 邓小闲的小楼 (rimbaud-lee.github.io)

微服务-分布式事务

==1.分布式事务产生原因==

首先我们看看项目中的下单业务整体流程:

image-20240620154853978

由于订单、购物车、商品分别在三个不同的微服务,而每个微服务都有自己独立的数据库,因此下单过程中就会跨多个数据库完成业务。而每个微服务都会执行自己的本地事务:

  • 交易服务:下单事务
  • 购物车服务:清理购物车事务
  • 库存服务:扣减库存事务

整个业务中,各个本地事务是有关联的。因此每个微服务的本地事务,也可以称为分支事务。多个有关联的分支事务一起就组成了全局事务。我们必须保证整个全局事务同时成功或失败。

我们知道每一个分支事务就是传统的单体事务,都可以满足ACID特性,但全局事务跨越多个服务、多个数据库,不能满足!!!!!!!!!!!!

  • 产生原因

事务并未遵循ACID的原则,归其原因就是参与事务的多个子业务在不同的微服务,跨越了不同的数据库。虽然每个单独的业务都能在本地遵循ACID,但是它们互相之间没有感知,不知道有人失败了,无法保证最终结果的统一,也就无法遵循ACID的事务特性了。

这就是分布式事务问题,出现以下情况之一就可能产生分布式事务问题:

  • 业务跨多个服务实现
  • 业务跨多个数据源实现

==2.CAP定理==

1998年,加州大学的计算机科学家Eric Brewer提出,分布式系统要有三个指标:

  • Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致

  • Availability(可用性):用户访问分布式系统时,读/写操作总能成功

  • Partition tolerance(分区容错性):即使系统出现网络分区,整个系统也要持续对外提供服务

他认为任何分布式系统架构方案都不能同时满足这三个目标,这个结论就是CAP定理。

2.1 一致性

Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致

image-20241008165045608

2.2 可用性

Availability (可用性):用户访问分布式系统时,读或写操作总能成功。

只能读不能写,或者只能写不能读,或者两者都不能执行,就说明系统弱可用或不可用。

2.3 分区容错性

Partition tolerance(分区容错性):即使系统出现网络分区partition,整个系统也要持续对外提供服务tolerance。

其中partition(分区):当分布式系统节点之间出现网络故障导致节点之间无法通信的情况。

image-20241008170759823

如上图,node01和node02之间网关畅通,但是与node03之间网络断开。于是node03成为一个独立的网络分区;node01和node02在一个网络分区。

其中tolerance(分区容错):当系统出现网络分区,整个系统也要持续对外提供服务。

2.4 三者矛盾(P一定有)

在分布式系统中,网络不能100%保证畅通(partition网络分区的情况一定会存在)。而我们的系统必须要持续运行,对外提供服务。所以分区容错性(P)是硬性指标,所有的分布式系统都要满足。

而设计分布式系统的时候要取舍的就是一致性(C)和可用性(A)。

【P一定有,C和A不一定有】

image-20241008172328325

如果允许可用性(A):这样用户可以任意读写,但是由于node03不能同步数据,那就会出现数据不一致情况【只满足AP】

如果允许一致性(C):如果用户不允许随意读写(不允许写,允许读)一直到网络恢复,分区消失,只能满足数据一致性【只满足CP】

2.5 解决三者矛盾(BASE理论)

因为P一定有,C和A不一定有:所以要考虑到底是牺牲一致性还是可用性?—>BASE理论

  • Basically Available 基本可用:分布式系统在出现故障时,允许损失部分可用性,【保证核心可用性】
  • Soft State软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
  • Eventually Consistent最终一致性:虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。

总而言之,BASE理论其实就是一种取舍方案,不再追求完美,而是追求完成目标。

  • AP思想:【AT模式】各个子事务分别执行和提交,无需锁定数据。允许出现结果不一致,然后采用弥补措施恢复,实现最终一致。
  • CP思想:【XA模式】各个子事务执行后不要提交,而是等待彼此结果,然后同时提交或回滚。在这个过程中锁定资源,不允许其它人访问,数据处于不可用状态,但能保证一致性。

—————————————

==解决方案(中间人-事务协调者)==

解决分布式事务的方案有很多,但实现起来都比较复杂,因此我们一般会使用开源框架来解决分布式事务问题。在众多的开源分布式事务框架中,功能最完善、使用最多的就是阿里巴巴在2019年开源的Seata了。

1.Seata

官方地址:Seata

分布式事务产生的一个重要原因:参与事务的多个分支事务互相无感知, 不知道彼此的执行状态。

解决方案:就是找一个统一的事务协调者,与多个分支事务通信,检测每个分支事务的执行状态,保证全局事务下的每一个分支事务同时成功或失败即可。大多数的分布式事务框架都是基于这个理论来实现的。

image-20240620161226852

1.1 Seata架构

Seata也不例外,在Seata的事务管理中有三个重要的角色:

  • TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
  • TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
  • RM (Resource Manager) - 资源管理器:管理分支事务,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

image-20240620162213687

  • 现来方式:直接执行全局事务,然后中途调用各个分支事务,执行结束就完成【各个分支不知道彼此是否正确】

  • 现在方式:直接执行全局事务(事务管理器TM管理开始和结束),然后中途调用各个分支事务(各个RM告知TC这个全局事务有我,我开始了,我结束了),执行结束就完成【中途有什么问题TC都知道,随时可能回滚】

1.2 代码实现思路(两个方面)

TMRM【Seata的客户端部分】,引入到参与事务的微服务依赖中即可。(将来TMRM就会协助微服务,实现本地分支事务与TC之间交互,实现事务的提交或回滚。)

TC【事务协调中心】,是一个独立的微服务,需要单独部署。

—————————————

==Seata具体操作(分两个部分)==

1.TC部署

1.1 准备数据库表

image-20240620170911535

其中seata-tc.sql内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
CREATE DATABASE IF NOT EXISTS `seata`;
USE `seata`;


CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`status` TINYINT NOT NULL,
`application_id` VARCHAR(32),
`transaction_service_group` VARCHAR(32),
`transaction_name` VARCHAR(128),
`timeout` INT,
`begin_time` BIGINT,
`application_data` VARCHAR(2000),
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`xid`),
KEY `idx_status_gmt_modified` (`status` , `gmt_modified`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;


CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL,
`xid` VARCHAR(128) NOT NULL,
`transaction_id` BIGINT,
`resource_group_id` VARCHAR(32),
`resource_id` VARCHAR(256),
`branch_type` VARCHAR(8),
`status` TINYINT,
`client_id` VARCHAR(64),
`application_data` VARCHAR(2000),
`gmt_create` DATETIME(6),
`gmt_modified` DATETIME(6),
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;


CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR(128) NOT NULL,
`xid` VARCHAR(128),
`transaction_id` BIGINT,
`branch_id` BIGINT NOT NULL,
`resource_id` VARCHAR(256),
`table_name` VARCHAR(32),
`pk` VARCHAR(36),
`status` TINYINT NOT NULL DEFAULT '0' COMMENT '0:locked ,1:rollbacking',
`gmt_create` DATETIME,
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

CREATE TABLE IF NOT EXISTS `distributed_lock`
(
`lock_key` CHAR(20) NOT NULL,
`lock_value` VARCHAR(20) NOT NULL,
`expire` BIGINT,
primary key (`lock_key`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;

INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('AsyncCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryCommitting', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('RetryRollbacking', ' ', 0);
INSERT INTO `distributed_lock` (lock_key, lock_value, expire) VALUES ('TxTimeoutCheck', ' ', 0);

1.2 准备配置文件

  • 准备seata目录(包含application.yml配置文件),到时候docker容器可以挂载

image-20240620172739258

其中application.yml信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
server:
port: 7099 #控制台端口

spring:
application:
name: seata-server

logging:
config: classpath:logback-spring.xml
file:
path: ${user.home}/logs/seata
# extend:
# logstash-appender:
# destination: 127.0.0.1:4560
# kafka-appender:
# bootstrap-servers: 127.0.0.1:9092
# topic: logback_to_logstash

#控制台信息 ip:7099进入之后账号和密码
console:
user:
username: admin
password: admin

seata:
#配置中心
config:
# support: nacos, consul, apollo, zk, etcd3 多种配置中心
type: file
# nacos:
# server-addr: nacos:8848
# group : "DEFAULT_GROUP"
# namespace: ""
# dataId: "seataServer.properties"
# username: "nacos"
# password: "nacos"
#注册中心
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa 多种注册中心
type: nacos
nacos:
application: seata-server
server-addr: nacos:8848 #①ip地址 ②nacos就是容器名,意味着nacos和seata要在同一网络中(这样可通过容器名访问)
group : "DEFAULT_GROUP"
namespace: ""
username: "nacos"
password: "nacos"
# server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
security:
secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
tokenValidityInMilliseconds: 1800000
ignore:
urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
server:
# service-port: 8091 #If not configured, the default is '${server.port} + 1000'
max-commit-retry-timeout: -1
max-rollback-retry-timeout: -1
rollback-retry-timeout-unlock-enable: false
enable-check-auth: true
enable-parallel-request-handle: true
retry-dead-threshold: 130000
xaer-nota-retry-timeout: 60000
enableParallelRequestHandle: true
recovery:
committing-retry-period: 1000
async-committing-retry-period: 1000
rollbacking-retry-period: 1000
timeout-retry-period: 1000
undo:
log-save-days: 7
log-delete-period: 86400000
session:
branch-async-queue-size: 5000 #branch async remove queue size
enable-branch-async-remove: false #enable to asynchronous remove branchSession
store:
# support: file 、 db 、 redis
mode: db
session:
mode: db
lock:
mode: db
#数据库配置
db:
datasource: druid
db-type: mysql
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://mysql:3306/seata?rewriteBatchedStatements=true&serverTimezone=UTC
user: root
password: 123
min-conn: 10
max-conn: 100
global-table: global_table
branch-table: branch_table
lock-table: lock_table
distributed-lock-table: distributed_lock
query-limit: 1000
max-wait: 5000
# redis:
# mode: single
# database: 0
# min-conn: 10
# max-conn: 100
# password:
# max-total: 100
# query-limit: 1000
# single:
# host: 192.168.150.101
# port: 6379
metrics:
enabled: false
registry-type: compact
exporter-list: prometheus
exporter-prometheus-port: 9898
transport:
rpc-tc-request-timeout: 15000
enable-tc-server-batch-send-response: false
shutdown:
wait: 3
thread-factory:
boss-thread-prefix: NettyBoss
worker-thread-prefix: NettyServerNIOWorker
boss-thread-size: 1

1.3 Docker部署

  • 1.导入镜像文件和配置文件
image-20240620171216896
  • 2.加载镜像文件
image-20240620171453072
  • 3.运行docker容器
1
2
3
4
5
6
7
8
9
docker run --name seata \
-p 8099:8099 \
-p 7099:7099 \
-e SEATA_IP=192.168.92.129 \
-v ./seata:/seata-server/resources \
--privileged=true \
--network heima \
-d \
seataio/seata-server:1.5.2

image-20240620172200138

  • 4.查看容器运行情况:docker logs -f seata

image-20240620172332870

  • 5.在浏览器输入IP:7099即可打开控制台

image-20240620172510841

2.微服务集成Seata

2.1 引入依赖

【所有分支事务都需要引入】为了方便各个微服务集成seata,我们需要把seata配置共享到nacos,因此trade-service模块不仅仅要引入seata依赖,还要引入nacos依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!--统一配置管理,读取nacos共享配置文件-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!--读取bootstrap文件-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>

<!--如果只需要seata集成微服务,那就只导入这个依赖!!!!!!!!!!-->
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

image-20240621094124650

2.2 添加配置(统一配置到nacos)

【一般直接配置到apolication.yml文件】因为多个分支事务都需要,那我就可以将seata的配置放在nacos统一配置,剩下的就是改造application.yml和bootstrap.yml文件信息。

2.2.1 配置公共配置

server-addr一定要配置自己的ip:【不然容易注册不到nacos上去!!!】

image-20240621093202209

让微服务能找到TC的位置:

image-20240621093439565

这样配置之后,各个分支事务都去配置这个TC信息:

2.2.2 分支事务新建bootstrap.yml文件

这样配置之后,各个分支事务都去配置这个TC信息:

image-20240621094830123

2.2.3 分支事务调整application.yml文件

image-20240621094943628

2.3 添加数据库保存快照

seata的客户端(TM和RM)在解决分布式事务的时候需要记录一些中间数据,保存在数据库中。因此我们要先准备一个这样的表。

对三个分支事务hm-trade、hm-cart、hm-item三个数据库加入一个undo_log日志表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

添加完成之后:

image-20240621100417899

2.4 修改具体业务

我们重新启动项目之后,可以查看seata日志:

image-20240621112336246

然后针对出问题的方法进行修改【修改为GlobalTransactional注解】:

image-20240621102212713

@GlobalTransactional注解就是在标记事务的起点,将来TM就会基于这个方法判断全局事务范围,初始化全局事务。如果中途有分支事务出现问题,我们就可以告知TC进行回滚操作,保证全局事务要么成功/要么失败。

3.实现步骤

  • 1.准备TC所需要的数据库,准备配置文件和镜像文件 —【可以直接去服务器利用docker配置TC】
  • 2.微服务继承Seata
    • 2.1 引入seata依赖
    • 2.2 在yml配置seata信息 【因为涉及多个分支事务,所以一般配置到nacos】
    • 2.3 原有出现问题的方法替换@Tradtional注解为@GlobalTransactional注解解决分布式事务

==Seate四种底层原理-[四种]==

Seata支持四种不同的分布式事务解决方案:

  • XA
  • TCC(Try-Confirm-Cancel)
  • AT(Automatic Transaction)
  • SAGA

代码使用思路

【使用过程中,只是yml配置多一个属性】

其实就是Seata实现步骤:

①导入依赖

②yml配置(基础配置+模式配置属性) —-多了一个配置data-source-proxy-mode

③全局事务位置加注解@GlobalTransactional,分支事务加@Transactional

1.XA模式[统一控制,统一提交]

==①各事务执行完都锁住②统一判断是全部提交/全部撤回==

XA 规范 是X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA 规范 描述了全局的TM与局部的RM之间的接口,几乎所有主流的数据库都对 XA 规范提供了支持。

1.1 基本概念

主要分为==两个阶段==提交:

一阶段的工作:【TM通知各个RM执行本地事务,RM向TC注册和报告完成情况,但是不提交保持数据库锁】

①RM注册分支事务到TC【告知我是哪个TM的,我完成什么任务】

②RM执行分支业务sql但不提交【完成任务了】tryPayOrderByBalance

③RM报告执行状态到TC【告诉你我完成了】

二阶段的工作:【TC基于一阶段RM提交事务状态来判断下一步操作是回滚还是提交】

①TC检测各分支事务执行状态【看看各个RM完成如何】

​ a.如果都成功,通知所有RM提交事务【ok,提交吧】

​ b.如果有失败,通知所有RM回滚事务【no,回滚吧】

②RM接收TC指令,提交或回滚事务【TC告诉我其他人好了/有问题,就提交/回滚】

image-20240621140448858

1.1 我们启动服务通过注解@GlobalTranscational开启全局事务

1.2 我们操作的时候调用多个分支事务

1.3 分支事务先向TC进行注册,告知TC我的哪个TM负责的,我要完成什么【告知之后可以进行业务逻辑】

1.4 开始执行业务,进行sql语句的完成【但是不提交!!!!】

1.5 执行业务sql完成之后报告TC我已经完成我自己的任务了,报告事务状态【TC就知道分支业务完成状态(有的完成了,有的失败了)】

因为第一阶段结束我们可以进行结束全局事务,后续看看是回滚还是提交

2.1 结束全局事务

2.2 TM告知TC检查一下第一阶段各分支事务执行状态,看是不是所有都完成

2.3 因为要全局事务要么提交/要么回滚,如果都成功,通知所有RM提交事务,如果有失败,通知所有RM回滚事务

流程图如下:

image-20240621143957304

1.2 具体实现操作

1.2.1 yml配置

image-20240621131831960

1.2.2 修改具体业务

对应全局事务位置添加@GlobalTranscational:

image-20240621131953128

针对各个分支事务添加@transactional:

image-20240621132234331

1.2.3 测试

我们加入手机到购物车,然后修改手机库存stock=0下单之后trade-service会提示:

image-20240621134914617

1.3 XA使用总结

image-20240621140206477

1.4 XA优缺点

  • XA模式的优点是什么?

    • 事务的强一致性,满足ACID原则【第一阶段只完成不提交,只有第二阶段才告知一起回滚,还是一起提交】

    • 常用数据库都支持,实现简单,并且没有代码侵入【比较好理解,而且比较规整】

  • XA模式的缺点是什么?

    • 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,性能较差【第一阶段只能等第二阶段指令,阻塞时间长】

    • 依赖关系型数据库实现事务【关系型数据库】

1.5 XA模式和AT模式对比

XA模式【模式强一致,一步一步来】 AT模式【模式最终一致,可以第二阶段回退】
第一阶段 只完成不提交(锁定资源,阻塞) 直接提交(不锁定资源)
第二阶段 数据库机制完成回滚 数据快照完成回滚(第一阶段执行业务之前生成快照)
性能 低(只有第二阶段才决定事务提交/回退) 高(第一阶段就提交,第二阶段可以恢复)

2.AT模式[各自提交,有问题快照恢复]

分阶段提交的事务模型,不过弥补了XA模型中资源锁定周期过长的缺陷(一直阻塞等到第二阶段TC告知RM才可以进行操作)

==①你可以自己提交,然后提交的时候搞个快照(备份),不用锁定资源②如果都成功就删除快照(备份),不成功就用快照(备份)恢复==

2.1 基本概念

主要分为==两个阶段==提交:

一阶段的工作:

  1. TM发起并注册全局事务到TC
  2. TM调用分支事务
  3. 分支事务准备执行业务SQL
  4. RM拦截业务SQL,根据where条件查询原始数据,形成快照。【在执行业务sql之前生成快照
1
2
3
{
"id": 1, "money": 100
}
  1. RM执行业务SQL,提交本地事务,释放数据库锁。此时 money = 90【我已经完成了自己的任务,并且提交了】
  2. RM报告本地事务状态给TC

二阶段的工作:

  1. TM通知TC事务结束【ok了,你判断一下吧】
  2. TC检查分支事务状态【如果都成功删除快照,如果有失败就用快照恢复数据库回滚】
    1. 如果都成功,则立即删除快照
    2. 如果有分支事务失败,需要回滚。读取快照数据({“id”: 1, “money”: 100}),将快照恢复到数据库。此时数据库再次恢复为100

image-20240621151403207

流程图如下:

image-20240621153523017

2.2 具体实现操作

2.2.1 yml配置

类似于XA,就是将属性改为AT

image-20240621151209260

2.2.2 修改具体业务

类似与XA,就是全局事务位置加注解@GlobalTransanctional,分支事务位置加注解@Transanctional

2.2.3 添加快照undo表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';

2.2.4 测试

类似于XA测试,只不过多了快照数据进入到undo表

2.3 AT使用总结

1.yml添加配置

2.业务添加注解@GlobalTransanctional即可

3.添加快照表【比XA模式就多一个这个】

2.4 AT优缺点

  • XA模式的优点是什么?

    • 第一阶段就直接提交了,性能较好【后续如果需要就使用快照恢复】
  • XA模式的缺点是什么?

    • 第一阶段就提交,在第二阶段完成的极小时间段内可能出现数据不一致【用空间换时间】—99%没问题,极端情况下【特别是多线程并发访问AT模式的分布式事务时,有可能出现脏写问题(丢失一次更新)】

2.5 AT模式—脏写问题

这种模式在大多数情况下(99%)并不会有什么问题,不过在极端情况下,特别是多线程并发访问AT模式的分布式事务时,有可能出现脏写问题,如图:

image-20241008215009702

  • 解决思路:引入全局锁【在释放DB锁之前,先拿到全局锁】,避免同一时刻有另外一个事务来操作当前数据。

全局锁(TC管理):记录这行数据,其他事务可以CRUD】

DB锁(数据库管理):锁住这行数据,其他事务不可以CRUD】

image-20241008215937381

就是在原来基础上,增加获取全局锁部分[记录当前操作这行数据的事务];其他的事务获取全局锁(失败重试30次,间隔10ms一次)

2.6 XA模式和AT模式对比

XA模式【模式强一致,第二阶段统一处理】 AT模式【模式最终一致,可以第二阶段回退】
第一阶段 只完成不提交(锁定资源,阻塞) 直接提交(不锁定资源)
第二阶段 数据库机制完成回滚 数据快照完成回滚(第一阶段执行业务之前生成快照)
性能 低(只有第二阶段才决定事务提交/回退) 高(第一阶段就提交,第二阶段可以恢复)

3.TCC模式[各自提交,有问题人工恢复]

TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:

  • try:检测和预留资源;
  • confirm:完成资源操作业务;要求 try 成功 confirm 一定要能成功。
  • cancel:释放预留资源,【try的反向操作】

3.1 流程分析[举例]

例子:一个扣减用户余额的业务。假设账户A原来余额是100,需要余额扣减30元。

  • 阶段一(try):检查余额是否充足,如果充足则冻结金额增加30元,可用余额扣减30

image-20241009090835461

  • 阶段二(Confrim):假设要提交,之前可用金额已经扣减,并且转移到冻结金额。因此可用金额不变,直接冻结金额-30即可:

image-20241009090948444

  • 阶段三(Cancel):如果要回滚,则释放之前冻结的金额(冻结金额-30,可用金额+30)

image-20241009091156001

3.2 事务悬挂和空回滚

假如一个分布式事务中包含两个分支事务,try阶段,一个分支成功执行,另一个分支事务阻塞

img

如果阻塞时间太长,可能导致全局事务超时而触发三阶段的cancel操作。两个分支事务都会执行cancel操作:

image-20241009091343545

其中一个分支是未执行try操作的,直接执行了cancel操作,反而会导致数据错误。因此,这种情况下,尽管cancel方法要执行,但其中不能做任何回滚操作,这就是空回滚【一个分支事务没执行try操作,但要被牵连执行cancel操作,需要执行cancel操作但是不能做任何回滚操作(不应该回滚)】

image-20241009091922827

对于整个空回滚的分支事务,将来阻塞结束try方法依然会执行。但是整个全局事务其实已经结束了,因此永远不会再有confirm或cancel,也就是说这个事务执行了一半,处于悬挂状态【阻塞结束,try执行,但是整体全局事务已经结束,无后续】

3.3 TCC使用总结

CC的优点是什么?

  • 一阶段完成直接提交事务,释放数据库资源,性能好
  • 相比AT模型,无需生成快照,无需使用全局锁,性能最强【不需要快照,人工恢复】
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库【人工补偿,不依赖数据库事务】

TCC的缺点是什么?

  • 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦【人工恢复,需要代码入侵】
  • 软状态,事务是最终一致【不是强一致性,BASE理论】
  • 需要考虑Confirm和Cancel的失败情况,做好幂等处理、事务悬挂和空回滚处理

微服务-服务保护

==微服务拆分容易出现的问题==

1.雪崩问题

1.1 产生背景

在微服务远程调用的过程中,还存在几个问题需要解决。

业务健壮性问题:

例如在之前的查询购物车列表业务中,购物车服务需要查询最新的商品信息,与购物车数据做对比,提醒用户。大家设想一下,如果商品服务查询时发生故障,查询购物车列表在调用商品服务时,是不是也会异常?从而导致购物车查询失败。

但从业务角度来说,为了提升用户体验,即便是商品查询失败,购物车列表也应该正确展示出来,哪怕是不包含最新的商品信息。

级联失败问题:

还是查询购物车的业务,假如商品服务业务并发较高,占用过多Tomcat连接。可能会导致商品服务的所有接口响应时间增加,延迟变高,甚至是长时间阻塞直至查询失败。

此时查询购物车业务需要查询并等待商品查询结果,从而导致查询购物车列表业务的响应时间也变长,甚至也阻塞直至无法访问。而此时如果查询购物车的请求较多,可能导致购物车服务的Tomcat连接占用较多,所有接口的响应时间都会增加,整个服务性能很差, 甚至不可用。

image-20240620110735548

依次类推,整个微服务群中与购物车服务、商品服务等有调用关系的服务可能都会出现问题,最终导致整个集群不可用。

image-20240620110826809

==雪崩【级联失败】==:微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用。

1.2 产生原因

  • 微服务相互调用,服务提供者出现故障或阻塞。

  • 服务调用者没有做好异常处理,导致自身故障。

  • 调用链中的所有服务级联失败,导致整个集群故障

1.3 解决方案

  • 尽量避免服务出现故障或阻塞。–请求限流和线程隔离

    • 保证代码的健壮性;

    • 保证网络畅通;

    • 能应对较高的并发请求;

  • 服务调用者做好远程调用异常的后备方案,避免故障扩散 –服务熔断

这些方案或多或少都会导致服务的体验上略有下降,比如请求限流,降低了并发上限;线程隔离,降低了可用资源数量;服务熔断,降低了服务的完整度,部分服务变的不可用或弱可用。因此这些方案都属于服务降级的方案。但通过这些方案,服务的健壮性得到了提升。

1.请求限流(降低访问流量)

==限制访问微服务的请求并发量,避免服务因流量激增出现故障==

服务故障最重要原因,就是并发太高!解决了这个问题,就能避免大部分故障。当然,接口的并发不是一直很高,而是突发的。因此请求限流,就是限制或控制接口访问的并发流量,避免服务因流量激增而出现故障。

请求限流往往会有一个限流器,数量高低起伏的并发请求曲线,经过限流器就变的非常平稳。这就像是水电站的大坝,起到蓄水的作用,可以通过开关控制水流出的大小,让下游水流始终维持在一个平稳的量。

image-20240620111528990

2.线程隔离(降低独占资源数量)

==限制分给其他服务的线程数,保证不会因为一个服务挂了导致其他服务消耗完自己资源也挂了==

为了避免某个接口故障或压力过大导致整个服务不可用,我们可以限定每个接口可以使用的资源范围,也就是将其“隔离”起来。

image-20240620112057241

举例子说明:

image-20240620112134291

如图所示,我们给查询购物车业务限定可用线程数量上限为20,这样即便查询购物车的请求因为查询商品服务而出现故障,也不会导致服务器的线程资源被耗尽,不会影响到其它接口。

3.快速失败(fallback后备方案)

快速失败:给业务编写一个调用失败时的处理的逻辑,称为fallback。当调用出现故障(比如无线程可用)时,按照失败处理逻辑执行业务并返回,而不是直接抛出异常。

image-20240620112448590

4.服务熔断(提前预测,不对劲就fallback)

==【相当于一个提前预判】设定一个断路器(开关),统计请求的异常比例和慢调用比例,超过阈值我就拒绝不让你用。熔断了去走服务的后备fallback逻辑(备份方案)==

image-20240620133335898

思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务,即拦截访问该服务的一切请求。而当设定熔断时间结束后它会尝试放行一次请求测试,如果成功就是服务恢复时,断路器会放行访问该服务的请求; 如果放行不通过继续走熔断状态,所有请求走fallback快速失败。

image-20240620144802785

2. 常见服务保护技术

目前我们常见的就是以下两种,我们都推荐使用Sentinel:【实习公司用Hystrix】

image-20240619172500972

==Sentinel==

==服务保护方案–Sentinel基础使用==

1.介绍

Sentinel是阿里巴巴开源的一款服务保护框架,目前已经加入SpringCloudAlibaba中。官方网站:

https://security.feishu.cn/link/safety?target=https%3A%2F%2Fsentinelguard.io%2Fzh-cn%2F&scene=ccm&logParams=%7B%22location%22%3A%22ccm_docs%22%7D&lang=zh-CN

Sentinel 的使用可以分为两个部分:

  • 核心库(Jar包):不依赖任何框架/库,能够运行于 Java 8 及以上的版本的运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。在项目中引入依赖即可实现服务限流、隔离、熔断等功能。
  • 控制台(Dashboard):Dashboard 主要负责管理推送规则、监控、管理机器信息等。

2.安装

为了方便监控微服务,我们先把Sentinel的控制台搭建出来

2.1 下载jar包

下载地址:Releases · alibaba/Sentinel · GitHub

2.2 启动测试

  • 1.存放jar:将jar包放在任意非中文、不包含特殊字符的目录下,重命名为sentinel-dashboard.jar

image-20240620100439761

  • 2.启动:在当前目录下cmd打开命令行输入指令启动:
1
2
3
4
5
输入以下命令:
java -Dserver.port=8090 -Dcsp.sentinel.dashboard.server=localhost:8090 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard.jar

#其余配置项可以参考官方文档:
https://github.com/alibaba/Sentinel/wiki/%E5%90%AF%E5%8A%A8%E9%85%8D%E7%BD%AE%E9%A1%B9

运行结果:

image-20240620100605675

image-20240620100945847

需要输入账号和密码,默认都是:sentinel

登录后,即可看到控制台,默认会监控sentinel-dashboard服务本身

image-20240620101024738

3.微服务整合

我们以微服务-黑马商城中的cart-service购物车模块为例:

3.1 引入依赖

我们在cart-service服务pom.xml文件引入依赖

1
2
3
4
5
<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

因为整合到springalibaba,所以依赖名也是spring-cloud-starter前缀

image-20240620101220519

3.2 yml配置控制台

我们在cart-service服务yml文件引入依赖

1
2
3
4
5
6
7
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8090 # sentinel控制台地址
# 因为restful风格,如果不设置的话同一个controller下的接口都是一个资源
http-method-specify: true #是否设置请求方式作为资源名称

image-20240620101330326

3.3 测试

重启cart-service,然后访问查询购物车接口,sentinel的客户端就会将服务访问的信息提交到sentinel-dashboard控制台。并展示出统计信息:

image-20240620101510077

点击cart-service的簇点链路菜单,会看到下面的页面:

image-20240620101637577

所谓簇点链路,就是单机调用链路,是一次请求进入服务后经过的每一个被Sentinel监控的资源。【默认情况下,Sentinel会监控SpringMVC的每一个Endpoint(接口)】

因此,我们看到/carts这个接口路径就是其中一个簇点,我们可以对其进行限流、熔断、隔离等保护措施。

3.4 簇点链路

【默认情况下】Sentinel会把路径作为簇点资源的名称,无法区分路径相同但请求方式不同的接口,查询、删除、修改等都被识别为一个簇点资源(我们的SpringMVC接口是按照Restful风格设计,因此购物车的查询、删除、修改等接口全部都是/carts路径)

image-20240620101946062

解决方案:我们可以选择打开Sentinel的请求方式前缀,把请求方式 + 请求路径作为簇点资源名

image-20240620102041605

然后,重启服务,通过页面访问购物车的相关接口,可以看到sentinel控制台的簇点链路发生了变化:

image-20240620102128531

==服务保护方案–Sentinel四大解决方案==

我们以微服务-黑马商城中的cart-service购物车模块为例:

1.请求限流

前提:我们已经将cart-service购物车模块和sentinel建立连接,我们就可以通过控制台进行操作

1.1 控制台设置限流QPS

把查询购物车列表这个簇点资源的流量限制在了每秒6个,也就是最大QPS为6.

image-20240620102556908

1.2 Jmeter测试

【可参考笔记-jmeter快速入门】

我们利用Jemeter做限流测试,我们每秒发出10个请求:

image-20240620102852440

最终在sentinel监控结果如下:

image-20240620103024899

可以看出GET:/carts这个接口的通过QPS稳定在6附近,而拒绝的QPS在4附近,符合我们的预

2.线程隔离

2.1 控制台设置并发线程数

image-20240620104405158

2.2 Jmeter测试

【可参考笔记-jmeter快速入门】

我们利用Jemeter测试,每秒发送100个请求:

image-20240620104448371

最终在sentinel监控结果如下:

进入查询购物车的请求每秒大概在100,而在查询商品时却只剩下每秒10左右,符合我们的预期。

image-20240620104557342

此时如果我们通过页面访问购物车的其它接口,例如添加购物车、修改购物车商品数量,发现不受影响:

image-20240620104632824

利用线程隔离对查询购物车业务进行隔离,保护了购物车服务的其它接口。由于查询商品的功能耗时较高(我们模拟了500毫秒延时),再加上线程隔离限定了线程数为5,导致接口吞吐能力有限,最终QPS只有10左右。这就导致了几个问题:

第一,超出的QPS上限的请求就只能抛出异常,从而导致购物车的查询失败。但从业务角度来说,即便没有查询到最新的商品信息,购物车也应该展示给用户,用户体验更好。也就是给查询失败设置一个降级处理逻辑【fallback快速失败】。

第二,由于查询商品的延迟较高(模拟的500ms),从而导致查询购物车的响应时间也变的很长。这样不仅拖慢了购物车服务,消耗了购物车服务的更多资源,而且用户体验也很差。对于商品服务这种不太健康的接口,我们应该直接停止调用,直接走降级逻辑,避免影响到当前服务。也就是将商品查询接口熔断

3.快速失败-Fallback(后备方案)

触发限流或熔断后的请求不一定要直接报错,也可以返回一些默认数据或者友好提示,用户体验会更好。

3.1 两种配置方式

image-20240620134907425

3.2 举例-以方式二为例:

3.2.1 yml导入依赖

修改cart-service模块的application.yml文件,开启Feign的sentinel功能:

1
2
3
feign:
sentinel:
enabled: true # 开启feign对sentinel的支持
image-20240620104846227

3.2.2 编写降级处理类

在hm-api模块中给ItemClient定义降级处理类,实现FallbackFactory

image-20240620144115712

3.2.3 注入bean

hm-api模块中的com.hmall.api.config.DefaultFeignConfig类中将ItemClientFallback注册为一个Bean

image-20240620144206407

3.2.4 给对应Openfeign添加属性

hm-api模块中的ItemClient接口中使用ItemClientFallbackFactory

image-20240620144229206

3.2.5 Jmeter测试

image-20240620144351522

但是未被限流的请求延时依然很高:

image-20240620144417302

导致最终的平均响应时间较长。

4.服务熔断

查询商品的RT较高(模拟的500ms),从而导致查询购物车的RT也变的很长。这样不仅拖慢了购物车服务,消耗了购物车服务的更多资源,而且用户体验也很差。

对于商品服务这种不太健康的接口,我们应该停止调用,直接走降级逻辑,避免影响到当前服务。也就是将商品查询接口熔断。当商品服务接口恢复正常后,再允许调用。这其实就是断路器的工作模式了。

Sentinel中的断路器不仅可以统计某个接口的慢请求比例,还可以统计异常请求比例。当这些比例超出阈值时,就会熔断该接口,即拦截访问该接口的一切请求,降级处理;当该接口恢复正常时,再放行对于该接口的请求。

断路器的工作状态切换有一个状态机来控制:

image-20240620144802785

状态机包括三个状态:

  • closed:关闭状态,断路器放行所有请求,并开始统计异常比例、慢请求比例。超过阈值则切换到open状态
  • open:打开状态,服务调用被熔断,访问被熔断服务的请求会被拒绝,快速失败,直接走降级逻辑。Open状态持续一段时间后会进入half-open状态
  • half-open:半开状态,放行一次请求,根据执行结果来判断接下来的操作。
    • 请求成功:则切换到closed状态【正常状态】
    • 请求失败:则切换到open状态【熔断状态】

4.1 控制台设置熔断规则

image-20240620142613725

这种是按照慢调用比例来做熔断,上述配置的含义是:

  • RT超过200毫秒的请求调用就是慢调用
  • 统计最近1000ms内的最少5次请求,如果慢调用比例不低于0.5,则触发熔断
  • 熔断持续时长20s

4.2 Jmeter测试

image-20240620145038189

此时整个购物车查询服务的平均RT影响不大:

image-20240620145054930

==Sentinel服务保护总结==

1.具体使用操作

  • 1.启动sentinel控制台(jar包)

到这已经可以打开sentinel控制台

  • 2.具体使用的微服务pom.xml文件导入sentinel依赖
  • 3.具体使用的微服务yml文件配置sentinel控制台信息

到这已经可以在sentinel控制台查看

具体的四大解决方案【1请求限流,2线程隔离,4服务熔断在平台设置就行】

  • 1和2.控制台通过具体微服务-簇点链路的流控按钮设置请求限流(设置QPS)和线程隔离(设置并发线程数)

  • 3.fallback快速失败需要编写代码(Openfeign所在微服务进行编写)

  • ①在Openfeign所在微服务编写降级处理类实现FallbackFactory重写create方法里面将ItemClient被其他微服务调用的方法都重写处理(后备方案,可以是信息提示等)

  • ②将降级处理类加入bean

  • ③在微服务ItemClient的FeignClient注解添加属性fallbackFactory=①降级处理类

  • 4.控制台通过具体微服务-簇点链路的熔断按钮设置熔断策略

2.四大解决方案总结

1.请求限流(限制访问量)和线程隔离(限制线程数,不让此服务占用所有资源)相当于对单个微服务调用做保护

2.快速失败(如果请求限流/线程隔离资源没了,那就可以走这个fallback后备方案,不至于直接抛出异常)和服务熔断(提前对这个服务判断如果经常出异常,那就熔断)是在微服务调用其他微服务情况时候做保护

如果比喻成买火车票乘车:

1.请求限流就是限制购买人数

2.线程隔离就是车次分到A候车厅(六个通道)和B候车厅(四个通道),如果B满了或者速度太慢可以去A候车厅的通道,不至于直接瘫痪。

3.服务熔断就是我提前预测哪里经常会出问题,我可以一段时间内不让大家去那候车可以去备用候车厅;如果一段时间后,我可以测试走一个人如果通那就打开去候车,如果不行那就继续用备用候车厅。

4.快速失败就是如果这个通道走不了,那你不能让乘客不上车啊,所有可以大喇叭提示等待或者提供备用候车厅上车

==底层原理==

无论是①Hystix还是②Sentinel都支持线程隔离,实现方式不同。

①Hystix:线程池隔离[默认],信号量隔离

②Sentinel:信号量隔离

1.线程隔离

线程隔离有两种方式实现:

  • 线程池隔离:给每个微服务调用业务分配一个线程池【利用线程池本身实现隔离效果】
  • 信号量隔离:给每个业务设定线程数量,达到信号量上限时就禁止新的请求【不创建线程池,而是使用计数器模式】
image-20241009100448925

两者的优缺点:

信号量隔离 线程池隔离
优点 轻量级,无额外开销 支持主动超时,支持异步调用
缺点 不支持主动超时,不支持异步调用 线程的额外开销比较大
场景 高频调用,高扇出 低扇出

2.四种算法

在熔断功能中,需要统计异常请求或慢请求比例,也就是计数。在限流的时候,要统计每秒钟的QPS,同样是计数。可见计数算法在熔断限流中的应用非常多。sentinel中采用的计数器算法就是滑动窗口计数算法。

2.1 固定窗口计数

image-20241009102754788

  • 每个窗口维护1个计数器,每有1次请求就将计数器+1。限流就是设置计数器阈值,本例为3,图中红线标记
  • 如果计数器超过了限流阈值,则超出阈值的请求都被丢弃。

image-20241009102955454

说明:

  • 第1、2秒,请求数量都小于3,没问题
  • 第3秒,请求数量为5,超过阈值,超出的请求被拒绝

特殊情况:【无法结合前后的时间窗口的数据做综合统计】—只能统计当前某1个时间窗的请求数量是否到达阈值

image-20241009190539989

说明:

  • 假如在第5、6秒,请求数量都为3,没有超过阈值,全部放行
  • 但是,如果第5秒的三次请求都是在4.5-5s之间进来;第6秒的请求是在5-5.5s之间,那么4.5-5s之间就有6次请求!也就是说每秒的QPS达到了6,远超阈值。

2.2 滑动窗口计数

固定时间窗口算法中窗口有很多,其跨度和位置是与时间区间绑定,因此是很多固定不动的窗口。而滑动时间窗口算法中只包含1个固定跨度的窗口,但窗口是可移动动的,与时间区间无关。

具体规则如下:

  • 窗口时间跨度Interval大小固定,例如1秒
  • 时间区间跨度为Interval / n ,例如n=2,则时间区间跨度为500ms
  • 窗口会随着当前请求所在时间currentTime移动,窗口范围从currentTime-Interval时刻之后的第一个时区开始,到currentTime所在时区结束。

image-20241009194901251

限流阈值依然为3,绿色小块就是请求,上面的数字是其currentTime值。

  • 在第1300ms时接收到一个请求,其所在时区就是1000~1500
  • 按照规则,currentTime-Interval值为300ms,300ms之后的第一个时区是5001000,因此窗口范围包含两个时区:5001000、1000~1500,也就是粉红色方框部分
  • 统计窗口内的请求总数,发现是3,未达到上限。

若第1400ms又来一个请求,会落在1000~1500时区,虽然该时区请求总数是3,但滑动窗口内总数已经达到4,因此该请求会被拒绝:

image-20241009194936726

假如第1600ms又来的一个请求,处于15002000时区,根据算法,滑动窗口位置应该是10001500和1500~2000这两个时区,也就是向后移动:

image-20241009194947384

这就是滑动窗口计数的原理,解决了我们之前所说的问题。而且滑动窗口内划分的时区越多,这种统计就越准确。

2.3 令牌桶算法(Sentinel的热点参数)

其基本思路如图:【Sentinel中的热点参数(一段时间内频繁访问的用户id)限流

image-20241009201027208

说明:

  • 生成令牌:以固定的速率生成令牌,存入令牌桶【令牌桶满了以后,多余令牌丢弃】
  • 进入请求:必须先尝试从桶中获取令牌,①获取到令牌后才可以被处理②令牌桶中没有令牌,则请求等待或丢弃

基于令牌桶算法,每秒产生的令牌数量==QPS上限

当然也有例外情况,例如:

  • 某一秒令牌桶中产生了很多令牌,达到令牌桶上限N,缓存在令牌桶中,但是这一秒没有请求进入。
  • 下一秒的前半秒涌入了超过2N个请求,之前缓存的令牌桶的令牌耗尽,同时这一秒又生成了N个令牌,于是总共放行了2N个请求。超出了我们设定的QPS阈值。

因此,在使用令牌桶算法时,尽量不要将令牌上限设定到服务能承受的QPS上限。而是预留一定的波动空间,这样我们才能应对突发流量。

2.4 漏桶算法(Sentinel的排队等待)

漏桶算法与令牌桶相似,但在设计上更适合应对并发波动较大的场景,解决令牌桶中的问题。

简单来说就是请求到达后不是直接处理,①放入一个队列。②固定的速率从队列中取出并处理请求。[叫漏桶算法,就是把请求看做水,队列看做是一个漏了的桶]

image-20241009203016735

漏桶的优势就是流量整型,桶就像是一个大坝,请求就是水。并发量不断波动,就如图水流时大时小,但都会被大坝拦住。而后大坝按照固定的速度放水,避免下游被洪水淹没。

因此,不管并发量如何波动,经过漏桶处理后的请求一定是相对平滑的曲线:

image-20241009203304507

MybatisPlus

1.Mybatis介绍

在日常开发中应该能发现,单表的CRUD功能代码重复度很高,也没有什么难度。而这部分代码量往往比较大,开发起来比较费时。

因此,目前企业中都会使用一些组件来简化或省略单表的CRUD开发工作。目前在国内使用较多的一个组件就是MybatisPlus。

官方网站如下:

当然,MybatisPlus不仅仅可以简化单表操作,而且还对Mybatis的功能有很多的增强。

==Mybatis——-基础使用==

1.1 pom.xml引入依赖

MybatisPlus提供了starter,实现了自动Mybatis以及MybatisPlus的自动装配功能,坐标如下:

image-20240425162339464

如图所示,由于这个starter包含对mybatis的自动装配,因此完全可以替换掉Mybatis的starter

1.2 定义Mapper层

为了简化单表CRUD,MybatisPlus提供了一个基础的BaseMapper接口,其中已经实现了单表的CRUD:

我们直接==实现BaseMapper接口==即可

image-20240425173015132

1.3 对比

我们可以看出这样直接调用简单的CRUD方法即可,就不用自己去mapper层写方法和对应xml文件了。==只需要继承BaseMapper就能省去所有的单表CRUD==。

image-20240425173132151

1.4 底层实现原理

刚才①引入依赖和②mapper层继承BaseMapper接口就可以进行CRUD,那MP怎么知道是哪张表?表中有哪些字段?

这也是因为UserMapper在继承BaseMapper的时候指定了一个泛型和数据库对应的实体类

image-20240425174306575

MybatisPlus就是根据PO实体的信息来推断出表的信息,从而生成SQL的。默认情况下:

  • MybatisPlus会把PO实体的类名驼峰转下划线作为表名
  • MybatisPlus会把PO实体的所有变量名驼峰转下划线作为表的字段名,并根据变量类型推断字段类型
  • MybatisPlus会把名为id的字段作为主键

image-20240425174818635

但很多情况下,默认的实现与实际场景不符,因此MybatisPlus提供一些注解便于我们声明表信息

2.常见注解==解决po和mysql字段映射==

==如果不按照约定的话,需要使用以下三种注解来解决:==

MybatisPlus中比较常用的几个注解如下:

@TableName:用来指定表名

@TableId:用来指定表中的主键字段信息

@TableField:用来指定表中的普通字段信息

image-20240425175502493

其中,具体的细节如图所示:==使用查看==

image-20240425175807030

2.1 @TableName

  • 描述:表名注解,标识实体类对应的表
  • 使用位置:实体类

所有属性:

image-20240425180531600

2.2 @TableId

  • 描述:主键注解,标识实体类中的主键字段
  • 使用位置:实体类的主键字段

image-20240425180555143

其中type=IdType.xxxx取值范围:

image-20240425180718677

2.3 @TableField

描述:普通字段注解

image-20240425180842453

3.yml常见配置

在application.yml文件配置:

image-20240425171943982

==Mybatis——-核心功能==

刚才都是以id为条件的简单CRUD,一些复杂的SQL语句就需要用到一些高级功能。

1.条件构造器==提供复杂where语句==

修改、删除、查询的SQL语句都需要指定where条件

因此BaseMapper中提供的相关方法除了以id作为where条件以外,还支持更加复杂的where条件。

image-20240425181511109

参数中的Wrapper就是条件构造的抽象类,其下有很多默认实现,继承关系如图:

image-20240425181331409

其中,Wrapper的子类AbstractWrapper提供了where中包含的所有条件构造方法:

image-20240425181538417

而QueryWrapper在AbstractWrapper的基础上拓展了一个select方法,允许指定查询字段:

image-20240425181553298

而UpdateWrapper在AbstractWrapper的基础上拓展了一个set方法,允许指定SQL中的SET部分:

image-20240425181601512

1.1 QueryWrapper

==主要对where语句的条件进行设置==

对于查询:

1
2
3
select id,username,info,balance
from user
where name like "%o%" AND balance >= 1000

image-20240426152253324

对于修改:

1
2
3
update 
set balance=2000
where username='Jack'

image-20240426152523969

1.2 UpdateWrapper

==弥补BaseMapper中update()只能写 set Xxx==,提出的updatewrapper可以写成set balance=balance-xx这种形式

以更新多个id为例:

1
2
3
update user
set balance=balance-200
where id in(1,2,3)

这个set的赋值结果是基于字段现有值,这时候需要使用UpdateWrapper中的==setSql功能:==

image-20240426153602036

1.3 LambdaQueryWrapper

==1.1和1.2会在构造条件时候写死字段名称==,现在1.3就可以通过变量的getter方法结合反射获取

image-20240426154148926

2.自定义SQL

1.2中演示了一个修改余额-200的时候将sql维护应该放在持久层,而不是业务层:
image-20240426154345876

==利用Wrapper生成查询条件,然后再结合mapper自定义xml文件编写sql==

2.1 原位置变化

以刚才案例为例:

image-20240426154550302

2.2 Mapper层方法定义

image-20240426154750747

2.3 写sql语句

方式一:直接在mapper的方法上写@Select方法

方式二:在mapper.xml文件中写动态sql

和以往的区别就是:==where语句直接用${ew.customSqlSegment}替换==

image-20240426154956395

总结如下

与以往的变化就是我传入参数和where判断条件,mapper方法加一个@Param(“ew”)标志,然后sql里面就直接用${ew.customSqlSegment}替换

3.Service接口

通用接口为==Iservice==,默认实现为==ServiceImpl==。其中封装方法可以分为:

  • save:新增
  • remove:删除
  • update:更新
  • get:查询单个结果
  • list:查询集合结果
  • count:计数
  • page:分页查询

3.1 五大类方法解释

3.1.1 新增(save)

image-20240426155746820

3.1.2 删除(remove)

image-20240426155808475

3.1.3 修改(update)

image-20240426155852795

3.1.4 查询

3.4.1 查询一条(get)

image-20240426155954622

3.4.2 查询多条(list)

image-20240426160001166

3.4.3 计数(count)

image-20240426160025902

3.1.5调用mapper层自定义sql

通过getBaseMapper获取Mapper,然后就mapper.自定义sql()

image-20240426160126219

3.6 基本用法

现在的变化就是,==拿现成的直接用==:

image-20240426160527425

具体操作就是:

image-20240426160603797

1
--保证自定义mapper继承basemapper 【底层使用时候直接还是调用basemapper的方法】

3.7 快速搭建(直接看)

==1.业务简单的话直接调用mp方法;==

==2.业务复杂的话就跟原来方式一样,controller调用service方法,然后在mapper层写具体sql==

3.7.1 简单业务-直接调用mp方法

image-20240426170341983

3.7.2 复杂业务-原始模式优化

image-20240426171113039

之后调用mapper层的sql:

image-20240426170626547

3.8 Lambda查询[添加属性]

就是在基本的方法上(属性,最新值)再多使用一个属性(==判断条件==,属性,最新值)

这样就可以把动态sql里面标签这种麻烦的操作放在serviceImpl类上进行操作

image-20240426175243646

3.9 批量新增

三种方案:

image-20240427230857329

最推荐第三种我们在yml配置文件中添加&rewriteBatchedStatements=true

==Mybatis——-扩展功能==

1.代码生成

在使用MybatisPlus以后,基础的MapperServicePO代码相对固定,重复编写也比较麻烦

==为了方便生成基本固定的代码==

1.1 下载插件

image-20240428181825670

1.2 配置数据库

image-20240428182003520

1.3 配置信息生成代码

image-20240428182342159

1.4查看代码

image-20240428182458664

2.静态工具—-Db

有一种可能就是有AService用来查询用户和BService用来查询地址,他们都实现了Iservice可以实现一些简单的CRUD。现在需要查询用户和对应的地址,就可能AService调用BService,然后BService也要调用AService就会导致@Autowired时候循环依赖

MybatisPlus提供一个静态工具类:==Db==,==就是用来解决多个service层互相调用导致的循环依赖==,其中一些静态方法与IService中的方法签名基本一致,也可以帮助我们实现CRUD的功能

image-20240428183527812

在使用的时候,就可以直接像平时书写习惯直接调用

3.逻辑删除

多表查询时删除A表的数据同时也会删除B数据,但是B里面有一些比较重要的数据我们不想删除。因此,我们采用==逻辑删除==的方案:

可以考虑在表中添加一个字段flag(标记数据是否被删除),这样我们在删除数据的时候还需要将flag设置为true,如果在查询数据的时候还需要添加一个and flag=xxx的条件。这样的话就会让之前的查询和删除逻辑都要跟着变化,非常麻烦。

因此,MybatisPlus就添加了对逻辑删除的支持。

只有MybatisPlus生成的SQL语句才支持自动的逻辑删除【就是直接拿来用的哪些CRUD方法】

自定义SQL就需要自己手动处理逻辑删除

3.1 配置逻辑删除

我们对于Address表添加一个字段deleted用于判断是否删除:

image-20240429170953086

3.2 底层实现

我们在使用MybatisPlus自己的CRUD方法时候支持自动逻辑删除:

image-20240429121132491

具体的两个语法操作:

1.删除的时候我们就会将delete更改为一个update语句拼接一个deleted=false未被删除的判断

image-20240429171532251

2.查询的时候我们就会在where语句拼接一个deleted=false未被删除的判断

image-20240429171544562

3.3 注意事项

开启逻辑删除功能之后,可以像普通删除一样做CRUD,基本不用考虑代码逻辑功能问题。

但是,逻辑删除本身也有缺点:

  • 会导致数据库表垃圾数据越来越多,从而影响查询效率
  • sql中全都需要对逻辑删除字段做判断,影响查询效率

==因此,不太建议采用逻辑删除功能,如果数据不能删除,可以采用数据迁移到其他表的办法==

4.枚举处理器(字段有多个值)

对某个字段(0是正常,1是不正常)判断时候如果写==1这样很不美观,并且如果0和1的含义修改了要修改很多地方,因此我们可以使用枚举(很像c语言的参数宏定义)来处理

针对于之前案例User类的status属性,就可以这样修改:

image-20240429163735826

在原始的mybatis底层帮我们把Java中的类型和数据库的类型一一对应,但是对于枚举类型和Json类型无法解决。因此mybatisplus针对枚举和Json类型提出了新的处理器:

image-20240429164303308

4.1 配置枚举处理器

image-20240429164034948

4.2 定义枚举类

这样就可以将1和2分别代表正常和冻结,我们在使用的时候只需要调用UserStatus.NORMAL就可以对比了

此外,@EnumValue可以保证我们可以按照value的类型和数据库一一对应;而@JsonValue可以保证我们输出给前端的时候可以将描述词/对应值返回(而不是返回NORMAL/FORZEN这种类型)

image-20240429165234689

4.3 修改PO和VO类型

主要是将类型Integer改为UserStatus枚举类

image-20240429165517353

4.4 修改具体逻辑位置

原来位置是用数字比对,可读性太差,现在就可以优雅地使用枚举类

image-20240429165653581

5.Json类型处理器(字段是Json类型)

如果实体类有一个属性是Json类型,那么Java中的Json类型和数据库中的匹配就有问题:
就跟4枚举处理器里面将的,MybatisPlus在Myabtis的基础上提供了Json类型处理器

image-20240429172137951

5.1 配置Json类型处理器

image-20240430142629145

因为没有提供在application.yml配置的方式,只能通过给实体类属性添加注解

image-20240430142453316

5.2 测试查看

info字段已经改成了一个Json类型

image-20240430142717732

6.配置加密

目前我们配置文件中很多参数都是明文存储,如果开发人员跑路很容易导致敏感信息泄露。

MyBatisPlus从3.3.2版本开始提供了一个==基于AES算法的加密工具==,帮助我们对配置中的敏感信息做加密处理。

6.1 生成秘钥

以数据库的账户密码为例:

image-20240430143223630

6.2 配置秘钥

在application.yml文件中修改:

image-20240430143508047

6.3 测试

测试类:在测试类的注解上配置:

image-20240430143859062

启动项目:

image-20240430144043661

==Mybatis——-插件功能==

其实MybatisPlus提供了多个插件,而我们重点关注分页插件

image-20240430164735500

1.分页插件

1.1 配置分页功能

image-20240430152339416

1.2 测试简单分页

image-20240430152253190

1.3 测试复杂分页

针对于1.2的话其实就是更针对业务逻辑:

image-20240430164354320

==Mybatis——-使用操作==

1.可以创建好数据库表

2.根据mybatis插件生成:

​ po(可以添加注解保证数据库和Java实体类对应,对于枚举和Json类型都有新推出的处理器解决),

​ service(extends IService),

​ serviceImpl( extends ServiceImpl<XxxMapper, Xxx> implements IAddressService), [引入mapper方法:①注入xxxMapper ②直接getBaseMapper]

​ controller,

​ mapper(extends BaseMapper)

3.按照原有的设计思路写代码:

​ 3.1 简单的就直接调用service的CRUD方法【service接口默认也有实现类ServiceImpl<XXXMapper,实体类>,这样也说明底层还是直接调用BaseMapper方法】

​ 3.2 复杂的话,①xml文件按照原来的动态sql书写

​ 3.3 复杂的话,②使用xxxMapper.条件构造器[创建复杂where语句]

​ 3.4 复杂的话,③使用lambdaQuery()/lambdaUpdate()添加一些where语句 –新特性【好用】

【只不过在书写过程中有很多好用的扩展功能】

Elasticsearch

0.使用场景

数据库的模糊搜索功能单一,匹配条件非常苛刻,必须恰好包含用户搜索的关键字。

而在搜索引擎中,用户输入出现个别错字,或者用拼音搜索、同义词搜索都能正确匹配到数据。

综上,在面临海量数据的搜索,或者有一些复杂搜索需求的时候,推荐使用专门的搜索引擎来实现搜索功能。

image-20240517164744539

0.1 全文搜索

Elasticsearch 凭借其强大、可扩展和快速的搜索功能,在全文搜索场景中表现出色。

它通常用于支持大型网站和应用程序的搜索功能,允许用户执行复杂的查询并获得近乎实时的响应

0.2 实时分析

Elasticsearch 能够实时执行分析,因此适用于跟踪实时数据(如用户活动、交易或传感器输出)的仪表盘。这种能力使企业能够根据最新信息及时做出决策。

0.3 机器学习

通过在 X-Pack 中添加机器学习功能,Elasticsearch 可以自动检测数据中的异常、模式和趋势。这对于预测分析、个性化和提高运营效率非常有用。

0.4 地理数据应用

Elasticsearch 通过地理空间索引和搜索功能支持地理数据。这对于需要管理和可视化地理信息(如地图和基于位置的服务)的应用非常有用,使执行邻近搜索和基于位置的数据可视化成为可能。

0.5 日志和事件数据分析

企业使用 Elasticsearch 聚合、监控和分析各种来源的日志和事件数据。它是 ELK 堆栈(Elasticsearch、Logstash、Kibana)的关键组件,该堆栈常用于管理系统和应用程序日志,以发现问题并监控系统健康状况。

0.6 安全信息和事件管理(SIEM)

Elasticsearch 可用作 SIEM 的工具,帮助企业实时分析安全事件。这对于检测、分析和响应安全事件和漏洞至关重要。

上述每个用例都利用了 Elasticsearch 的优势(如可扩展性、速度和灵活性)来处理不同的数据类型和复杂的查询,为数据驱动型应用提供了重要价值。

1.Elasticsearch(ES)

ES是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容。

Elasticsearch结合Kibana,Logstash,beats是一整套技术栈,被叫做==ELK==。经常用来做日志收集、系统监控和状态分析等等:

image-20240430171212212

1.1 安装

1.1.1 安装elasticsearch

通过下面的Docker命令即可安装单机版本的elasticsearch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#先在tar所在目录下打开cmd
docker load -i es.tar

#创建一个网络【不然kibana不能连接es,踩坑了!!】
docker network create elastic

#黑马安装:
docker run -d \
--name es \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ #配置jvm的内存
-e "discovery.type=single-node" \ #配置运行模式【单点模式/集群模式】
-v es-data:/usr/share/elasticsearch/data \ #挂载
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged \
--network hm-net \
-p 9200:9200 \ #访问http端口
-p 9300:9300 \ #集群使用
elasticsearch:7.12.1

#csdn安装:
docker run -d --name es -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -e "discovery.type=single-node" --privileged --network elastic -p 9200:9200 -p 9300:9300 elasticsearch:7.12.1

启动之后访问http://localhost:9200/就可以看到elasticsearch信息:

image-20240507204417602

1.1.2 安装Kibana

通过下面的Docker命令,即可部署Kibana:

1
2
3
4
5
6
7
8
9
10
11
12
13
#先在tar所在目录下打开cmd
docker load -i kibana.tar

#黑马安装:
docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \ #es的地址,这里的es要和es配置docker的时候--name一致
--network=hm-net \ #网络和es一个网络
-p 5601:5601 \
kibana:7.12.1 #要保证和es版本一致!!!

#csdn安装:
docker run -d --name kibana -e ELASTICSEARCH_HOSTS=http://es:9200 --network elastic -p 5601:5601 kibana:7.12.1

启动之后访问http://localhost:5601/就可以通过kibana数据化访问elasticsearch:

image-20240507204635028

可以点击右上角Dev tools,进入开发工具页面:

image-20240507204914788

点击之后:

image-20240507205135009

2.倒排索引

elasticsearch的高性能搜索表现,因为底层的倒排索引技术解决的就是根据==部分词条模糊匹配==的问题。【Innodb底层就是用倒排索引做的全文索引】

2.1 正向索引(精准匹配)

我们有一张名为tb_goods的表:

image-20240507210036497

其中,id字段已经创建了索引(底层使用b+树)所以根据id搜索的速度会非常快。但是其他字段例如title只在叶子结点上存在。

比如用户的sql语句为:

1
2
3
select *
from tb_goods
where title like '%手机%';

搜索大概流程如图:

image-20240507212400849

说明:

  • 1)检查到搜索条件为like '%手机%',需要找到title中包含手机的数据
  • 2)逐条遍历每行数据(每个叶子节点),比如第1次拿到id为1的数据
  • 3)判断数据中的title字段值是否符合条件
  • 4)如果符合则放入结果集,不符合则丢弃
  • 5)回到步骤1

综上,根据id搜索条件为精确匹配时,可以走索引,查询效率较高。而当搜索条件为模糊匹配时,由于索引无法生效,导致从索引查询退化为全表扫描,效率很差。

因此,正向索引适合于根据索引字段的精确搜索,不适合基于部分词条的模糊匹配。

而倒排索引恰好解决的就是根据部分词条模糊匹配的问题。

2.2 倒排索引(模糊匹配)

2.2.1 基本概念

倒排索引中两个重要的概念:

  • 文档(Document):用来搜索的数据,每一条数据就是一个文档【一个网页,一个商品信息】

  • 词条(Term):对文档数据/用户搜索数据,利用某种算法分词,得到的具备含义的词语就是词条【我是中国人,就可以分为:我,是,中国人,国人,人这几个词条】

2.2.2 创建流程

创建倒排索引是对正向索引的一种特殊处理和应用,流程如下:

  • 将每个文档的数据利用分词算法根据语义拆分得到一个个词条

  • 创建表,表中每行数据:{词条,词条所在文档id,词条位置}

  • 因为词条唯一性,可以给词条创建正向索引(唯一索引)

此时形成的这张以词条为索引的表就是倒排索引表:

image-20240507214322180

2.2.3 搜索流程

以搜索”华为手机”为例,如图:

image-20240507215033610

流程描述:

1)用户输入条件"华为手机"进行搜索。

2)对用户输入条件分词,得到词条:华为手机

3)拿着词条在倒排索引中查找(由于词条有唯一索引,查询效率很高),即可得到包含词条的文档id:1、2、3

4)拿着文档id到正向索引中查找具体文档即可(由于id也有索引,查询效率也很高)

==根据条件先分词,每个词条去倒排索引查询【词条有唯一索引】找到对应文档id,根据文档id到正向索引【id有索引】查询具体文档(一条数据)==

2.2.4 两者对比

  • 正向索引是最传统的,根据id索引的方式。但根据词条查询时,必须先逐条获取每个文档,然后判断文档中是否包含所需要的词条,是根据文档找词条的过程
  • 倒排索引则相反,是先找到用户要搜索的词条,根据词条得到保护词条的文档的id,然后根据id获取文档。是根据词条找文档的过程

两者优缺点:

正向索引 倒排索引
优点 1.可以给多个字段创建索引
2.根据索引字段搜索和排序速度非常快
部分词条查询效率高【创建唯一索引】
缺点 部分词条查询效率不高,只能全表扫描 1.只能给词条创建索引,而不是字段
2.无法根据字段做排序

3.基础概念

3.1 文档(一行数据)和字段(一个列)

elasticsearch是面向文档(Document)存储的,可以是数据库中的一条商品数据,一个订单信息。文档数据会被序列化为json格式存储在elasticsearch中:

image-20240507230531294

因此, 数据库中一行数据 <==> ES中一个JSON文档;

而数据库中每行数据都包含很多列,这些列就转换为JSON文档中的字段(Field)

3.2 索引(数据库的表)和映射(数据库表结构约束)

随着业务发展,需要在es中存储的文档也会越来越多,比如有商品的文档,用户的文档,订单的文档等等;

image-20240508223520776

所有文档都散乱存放显然非常混乱,也不方便管理。

因此,我们要将==类型相同的文档==(一行数据)集中在一起管理,称为索引(Index)

image-20240508223847616

因此,==索引(类型相同的很多行文档) <—->数据库中的表==

数据库的表会有约束信息,用来定义表的结构、字段的名称、类型等信息。

因此,索引库中就有==映射(mapping),是索引中文档的字段约束信息,类似表的结构约束==

3.3 Mysql和Elasticsearch对比

image-20240508225648423

注意:mysql的语法就是sql,而es的语法是dsl【提供json风格的请求语句,用来操作es进行crud】

  • Mysql:擅长事务类型操作,可以确保数据的安全和一致性
  • Elasticsearch:擅长海量数据的搜索、分析、计算

因此在企业中,往往是两者结合使用:

  • 对安全性要求较高的写操作,使用mysql实现
  • 对查询性能要求较高的搜索需求,使用elasticsearch实现
  • 两者再基于某种方式,实现数据的同步,保证一致性

image-20240508231244479

3.4 数据一致性

1
2
3
4
5
方法一:同步双写,课程上架的时候数据写入Mysql,同步也写入ES
方法二:异步双写,课程上架的时候数据写入Mysql,发送消息给MQ,MQ通知ES更新 【项目使用】
方法三:定时同步,对于数据库新增的时候,定时批量/全量同步到ES
方法四:基于Logstash输入输出插件
方法五:基于cancal数据库增量日志解析工具,伪装主从数据库进行同步
策略 优点 缺点
同步双写 - 简单易实现
- 实时性高
- 代码侵入性强
- 存在不一致的风险
- 可能影响系统性能
异步双写(MQ方式) - 解耦数据写入操作
- 通过消息队列提升性能和扩展性
- 系统复杂度增加
- 可能存在消息丢失的风险
- 引入了消息中间件的依赖
定期同步 - 实现简单
- 无需改变现有业务逻辑
- 实时性差
- 可能给数据库带来额外压力
基于Binlog实时同步 - 无代码侵入
- 实时性较好
- 业务逻辑与数据同步解耦
- 构建Binlog系统复杂
- 可能存在MQ延时风险
使用Canal监听Binlog同步数据到ES - 基于MySQL的Binlog,实现数据的实时同步
- 减少系统耦合
- 需要维护额外的Canal服务

4.IK分词器(ikun)

Elasticsearch的关键就是倒排索引,而倒排索引依赖于对文档内容的分词情况(分词好那就效率高),而分词则需要高效、精准的分词算法,IK分词器就是这样一个中文分词算法

4.1 安装IK分词器

方案一:在线安装

1
docker exec -it es ./bin/elasticsearch-plugin  install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elasticsearch-analysis-ik-7.12.1.zip

方案二:离线安装

首先,查看之前安装的elasticsearch容器的plugins数据卷目录:

1
docker volume inspect es-plugins

image-20240507221107758

可以看到elasticsearch的插件挂载到了/var/lib/docker/volumes/es-plugins/_data这个目录。我们需要把IK分词器上传至这个目录

image-20240507221628934

4.2 使用IK分词器

4.2.1 官方标准分词器(standard)

我们在Kibana的DevTools上来测试分词器,首先测试Elasticsearch官方提供的标准分词器:

image-20240507221852165

我们可以看到,标准分词器只能1个字作为一个1个词条,无法正取对中文做分词

4.2.2 IK分词器(ik_smart智能语义切分)

这种情况下,可以智能的将词语切分。但是像程序员这种词可以拆分为程序员,程序,员。这个分词器无法实现

image-20240507222246345

4.2.3 IK分词器(ik_max_word最细粒度切分)

这种情况下,可以在4.2.2的前提下继续细分【程序员这种词可以拆分为程序员,程序,员】

image-20240507222545813

4.3 扩展词典

随着互联网的发展,“造词运动”也越发的频繁。出现了很多新的词语,在原有的词汇列表中并不存在。比如:“泰裤辣”,“传智播客” 等

image-20240507222659451

所以想要正确分词,IK分词器的词库也需要不断地更新,IK分词器提供了扩展词汇的功能:

我们在ik-config文件夹下的IkAnalyzer.cfg.xml文件添加拓展词典和停用词典,这样我们再调用的时候,宋亚翔和传智播客就可以被认为是一个词语作为词条

image-20240507224408412

==基础操作(对索引库和文档基础操作)==

==方式一:通过ES手动创建–很繁琐==

1.索引库操作(数据库表)

index类似数据库表,映射类似表的结构。我们要向es中存储数据,必须先创建索引(数据库表)和映射(数据库定义)

1.1 Mapping映射属性

Mapping是对索引库的文档设置约束,常见的mapping属性包括:

image-20240520150601809

1
2
3
4
5
6
7
8
9
10
11
12
{
"age": 21,
"weight": 52.1,
"isMarried": false,
"info": "黑马程序员Java讲师",
"email": "zy@itcast.cn",
"score": [99.1, 99.5, 98.9],
"name": {
"firstName": "云",
"lastName": "赵"
}
}

对应的每个字段映射(Mapping):

image-20240520151047898

1.2 索引库的CRUD

由于Elasticsearch采用的是Restful风格的API,因此其请求方式和路径相对都比较规范,而且请求参数也都采用JSON风格。

我们直接基于Kibana的DevTools来编写请求做测试,由于有语法提示,会非常方便。

1.2.1 创建索引库和映射

基本语法

  • 请求方式:PUT
  • 请求路径:/索引库名,可以自定义
  • 请求参数:mapping映射

格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
PUT /索引库名称
{
"mappings": {
"properties": {
"字段名":{
"type": "text",
"analyzer": "ik_smart"
},
"字段名2":{
"type": "keyword",
"index": "false"
},
"字段名3":{
"properties": {
"子字段": {
"type": "keyword"
}
}
},
// ...略
}
}
}

image-20240520151047898

1.2.2 查询索引库

基本语法

  • 请求方式:GET
  • 请求路径:/索引库名
  • 请求参数:无

image-20240520151340133

1.2.3 修改索引库(只能修改新字段)

倒排索引结构虽然不复杂,但是一旦数据结构改变(比如改变了分词器),就需要重新创建倒排索引,这简直是灾难。因此索引库一旦创建,无法修改mapping

虽然==无法修改mapping中已有字段,却允许添加新的字段到mapping==,因为不会对倒排索引产生影响。因此修改索引库能做的就是向索引库中添加新字段,或者更新索引库的基础属性。

基本语法

  • 请求方式:PUT
  • 请求路径:/索引库名
  • 请求参数:/_mapping

image-20240520151647929

1.2.4 删除索引库

基本语法:

  • 请求方式:DELETE
  • 请求路径:/索引库名
  • 请求参数:无

image-20240520151416751

==1.2.5 索引库操作总结==

索引库操作:

  • 创建索引库:PUT /索引库名{“mappings”:{“properties”:{部分新字段信息}}}}
  • 查询索引库:GET /索引库名
  • 删除索引库:DELETE /索引库名
  • 修改索引库【添加字段】:PUT /索引库名/_mapping{“properties”:{部分新字段信息}}

可以看到,对索引库的操作基本遵循的Restful的风格,因此API接口非常统一,方便记忆。

2.文档操作(一行数据)

有了索引库,接下来就可以向索引库中添加数据。而ElasticSearch数据就是JSON风格的文档。

2.1 新增文档

语法:

1
2
3
4
5
6
7
8
9
POST /索引库名/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
"字段3": {
"子属性1": "值3",
"子属性2": "值4"
},
}

例如,目前要新增id=1的文档:

image-20240520153641479

2.2 查询文档

语法:

1
GET /{索引库名称}/_doc/{id}

例如,查询id=1的文档:

image-20240520153813733

2.3 删除文档

语法:

1
DELETE /{索引库名}/_doc/id值

例如,删除id=1的文档:

image-20240520153911052

2.4 修改文档

修改有两种方式:

  • 全量修改:直接覆盖原来的文档【会删除旧文档,添加新文档(如果没有就直接删除)】
  • 局部修改:修改文档中的部分字段

2.4.1 全量修改

全量修改是覆盖原来的文档,其本质是两步操作:

  • 根据指定的id删除文档
  • 新增一个相同id的文档

注意:如果根据id删除时,id不存在,第二步的新增也会执行,也就从修改变成了新增操作了。

语法:

1
2
3
4
5
6
PUT /{索引库名}/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
// ... 略
}

image-20240520154216989

2.4.2 局部修改

局部修改是只修改指定id匹配的文档中的部分字段。【注意:局部修改是POST

语法:

1
2
3
4
5
6
POST /{索引库名}/_update/文档id
{
"doc": {
"字段名": "新的值",
}
}

image-20240520154237114

2.5 批处理

类似于Mysql数据库,可以进行多条数据一次性操作【感觉很麻烦,主要是可读性很差】

批处理采用==POST请求==,基本语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
POST _bulk
# 1.修改 --如果文档id存在就覆盖,不存在就创建
# index代表新增操作 _index表示索引库名 _id表示要操作的文档id
{ "index" : { "_index" : "test", "_id" : "1" } }
# 代表新增的文档内容
{ "field1" : "value1" }

# 2.删除
{ "delete" : { "_index" : "test", "_id" : "2" } }

# 3.新增 --如果文档id存在就报错
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }

# 4.更新
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

==2.6 文档操作总结==

相对于索引库创建,大致就是中间多了一个_doc路径,修改文档类似于修改索引库比较特殊。

  • 创建文档:POST /{索引库名}/_doc/文档id { json文档 }
  • 查询文档:GET /{索引库名}/_doc/文档id
  • 删除文档:DELETE /{索引库名}/_doc/文档id
  • 修改文档:
    • 全量修改:PUT /{索引库名}/_doc/文档id { json文档 }
    • 局部修改:POST /{索引库名}/_update/文档id { "doc": {字段}}

==索引库操作和文档操作对比:==

image-20240521150017482

5和6步骤主要是在网页端进行设置,因此提出了一个Java的客户端—==JavaRestClient==

==方式二:通过Java实现—不用繁琐的手动创建==

1.JavaRestClient

提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是==组装DSL语句==,通过http请求发送给ES。

由于ES目前最新版本是8.8,提供了全新版本的客户端,老版本的客户端已经被标记为过时。而我们采用的是7.12版本,因此只能使用老版本客户端:

image-20240520155549341

然后选择7.12版本,HighLevelRestClient版本:

image-20240520155613826

==1.1 初始化RestClient==

在Elasticsearch提供的API中,与Elasticsearch一切交互都封装在一个名为RestHighLevelClient的类中,必须先完成这个对象的初始化,建立与elasticsearch的连接。

1.1.1 引入RestHighLevelClient依赖

item-service模块中引入esRestHighLevelClient依赖:

1
2
3
4
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

1.1.2 覆盖ES版本

因为SpringBoot默认的ES版本是7.17.10,所以我们需要覆盖默认的ES版本:

1
2
3
4
5
6
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<!--覆盖成7.12.1-->
<elasticsearch.version>7.12.1</elasticsearch.version>
</properties>

1.1.3 初始化RestHighLevelClient

初始化的代码如下:

1
2
3
4
5
6
RestHighLevelClient client = new RestHighLevelClient(
//使用RestClient的builder方法创建
RestClient.builder(
HttpHost.create("http://192.168.xxx.xxx:9200")
)
);

==1.2 商品Mapping映射==

我们针对购物车数据库进行分析:

image-20240520172813812

我们可以对购物车的所有字段进行分析,判断哪些字段必须添加到ElasticSearch中,判断哪些字段必须添加搜索功能。从而进行新建索引库和映射:

image-20240520171754450

在网页上的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
PUT /items
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name":{
"type": "text",
"analyzer": "ik_max_word"
},
"price":{
"type": "integer"
},
"stock":{
"type": "integer"
},
"image":{
"type": "keyword",
"index": false
},
"category":{
"type": "keyword"
},
"brand":{
"type": "keyword"
},
"sold":{
"type": "integer"
},
"commentCount":{
"type": "integer",
"index": false
},
"isAD":{
"type": "boolean"
},
"updateTime":{
"type": "date"
}
}
}
}

1.3 索引库操作

创建索引库的JavaAPI和Restful接口API对比:

1.3.1 创建索引库

image-20240520173351287

具体代码如下:

image-20240521135017531

创建索引库:

image-20240521135105629

1.3.2 删除索引库

具体代码如下:

image-20240521135115905

1.3.3 查询索引库

具体代码如下:

==1.3.4 索引库操作总结==

JavaRestClient操作elasticsearch的流程基本类似。核心是client.indices()方法来获取索引库的操作对象。

索引库操作的基本步骤:

  • 1.初始化RestHighLevelClient类对象client【创建客户端】

  • 2.创建XxxIndexRequest对象request【XXX是CreateGetDelete

  • 3.准备请求参数request.source()方法【Create时需要参数,其他情况不需要】

  • 4.发送请求client.indices().xxx()方法【xxx是createexistsdelete

1.4 文档操作

1.4.1 新增文档

  • 1.创建Request对象,这里是IndexRequest,因为添加文档就是创建倒排索引的过程
  • 2.准备请求参数,本例中就是Json文档
  • 3.发送请求【client.index()方法就好了】

image-20240521142712455

1.4.2 查询文档

与之前的流程类似,代码大概分2步:

  • 创建Request对象
  • 准备请求参数,这里是无参,【直接省略】
  • 发送请求
  • 解析结果【因为结果在_source部分内】

image-20240521142844007

可以看到,响应结果是一个JSON,其中文档放在一个_source属性中,因此解析就是拿到_source,反序列化为Java对象即可

1.4.3 删除文档

与查询相比,仅仅是请求方式从DELETE变成GET,可以想象Java代码应该依然是2步走:

  • 1)准备Request对象,因为是删除,这次是DeleteRequest对象。要指定索引库名和id
  • 2)准备参数,无参,直接省略
  • 3)发送请求。因为是删除,所以是client.delete()方法

image-20240521143043972

1.4.4 修改文档

修改我们讲过两种方式:

  • 全量修改:本质是先根据id删除,再新增【与1.4.1一致】
  • 局部修改:修改文档中的指定字段值

在RestClient的API中,全量修改与新增的API完全一致,判断依据是ID:

  • 如果新增时,ID已经存在,则修改
  • 如果新增时,ID不存在,则新增

这里不再赘述,我们主要关注局部修改的API即可

image-20240521143147541

与之前类似,也是三步走:

  • 1)准备Request对象。【这次是修改,所以是UpdateRequest
  • 2)准备参数。【也就是JSON文档,里面包含要修改的字段】
  • 3)更新文档。【这里调用client.update()方法】

1.4.5 批量导入文档

7.4.1-7.4.4的单条处理通过BulkRequest解决。因此BulkRequest中提供了add方法,用以添加其它CRUD的请求:

image-20240521144140401

具体代码:

image-20240521143955532

==1.4.6 文档操作总结==

文档操作的基本步骤:

  • 1.初始化RestHighLevelClient类对象client【创建客户端】
  • 2.创建XxxRequest对象request【Xxx是IndexUpdateDeleteBulk
  • 3.准备请求参数request.source()方法(IndexUpdateBulk时需要)
  • 4.发送请求client.Xxx()方法【Xxx是indexgetupdatedeletebulk
  • 5.解析结果(Get查询时需要,数据在_source内部)

上述的操作都是围绕id来进行的,只能进行简单查询不符合我们所预期的搜索

==进阶操作(DSL查询,更高级的查询)==

Elasticsearch提供基于JSON的DSL(Domain Specific Language)语句来定义查询条件,其JavaAPI就是在组织DSL条件。

Elasticsearch的查询可以分为两大类:

  • 叶子查询(Leaf query clauses):一般是在特定的字段里查询特定值,属于简单查询,很少单独使用。
  • 复合查询(Compound query clauses):以逻辑方式组合多个叶子查询/更改叶子查询的行为方式。

在查询以后,还可以对查询的结果做处理,包括:

排序:按照1个或多个字段值做排序

分页:根据from和size做分页,类似MySQL

高亮:对搜索结果中的关键字添加特殊样式,使其更加醒目

聚合:对搜索结果做数据统计以形成报表

==后续内容总结图:==

image-20240522164945976

==方式一:通过手动创建–DSL查询==

1.快速入门

查询的语法结构:

1
2
3
4
5
6
7
8
GET /{索引库名}/_search   #_search是固定路径,不能修改
{
"query": {
"查询类型": {
// .. 查询条件
}
}
}

例如,我们以最简单的无条件查询为例【查询类型=match_all】:

1
2
3
4
5
6
7
8
GET /items/_search  #_search是固定路径,不能修改
{
"query": {
"match_all": { #查询类型=match_all
#match_all无条件,所以条件位置不写即可
}
}
}

image-20240521154302865

虽然是match_all,但是响应结果中并不会包含索引库中的所有文档,而是仅有10条。这是因为处于安全考虑,elasticsearch设置了默认的查询页数

2.查询—-①叶子查询

image-20240521154435190

2.1 全文检索–(分词)

全文检索的种类也很多,详情可以参考官方文档

2.1.1 match–检索一个字段

1
2
3
4
5
6
7
8
GET /{索引库名}/_search
{
"query": {
"match": {
"字段名": "搜索条件"
}
}
}

举例:

image-20240521162042810

2.1.2 multi_match–检索多个字段

1
2
3
4
5
6
7
8
9
GET /{索引库名}/_search
{
"query": {
"multi_match": {
"query": "搜索条件",
"fields": ["字段1", "字段2"]
}
}
}

举例:

image-20240521162042810

2.2 精确查询–(不分词)

精确查询,英文是Term-level query,顾名思义,词条级别的查询。也就是说不会对用户输入的搜索条件再分词,而是作为一个词条,与搜索的字段内容精确值匹配。因此推荐查找keyword、数值、日期、boolean类型的字段。例如:

  • id
  • price
  • 城市
  • 地名
  • 人名

等等,作为一个整体才有含义的字段。

详情可以查看官方文档

2.2.1 term–根据词条精确匹配

1
2
3
4
5
6
7
8
9
10
GET /{索引库名}/_search
{
"query": {
"term": {
"字段名": {
"value": "搜索条件"
}
}
}
}

举例:

image-20240521163826141

2.2.2 range–根据数值范围查询

1
2
3
4
5
6
7
8
9
10
11
GET /{索引库名}/_search
{
"query": {
"range": {
"字段名": {
"gte": {最小值},
"lte": {最大值}
}
}
}
}

举例:

image-20240521163921559

2.3 地理(geo)查询

未涉及,用到了补充

3.查询—-②复合查询

image-20240521164256532

其他符合查询和相关语法可以参考官方文档

3.1 布尔查询– 与/或/非

bool查询,即布尔查询。就是利用逻辑运算来组合一个或多个查询子句的组合。bool查询支持的逻辑运算有:

  • must:必须匹配每个子查询,类似“与” 【输入框的搜索条件肯定要参与相关性算分】
  • should:选择性匹配子查询,类似“或”
  • must_not:必须不匹配,不参与算分,类似“非”
  • filter:必须匹配,不参与算分 【其他的过滤条件就可以不参与算分】

bool查询的语法如下:

image-20240521170512412

出于性能考虑,与搜索关键字无关的查询尽量采用must_not或filter逻辑运算,避免参与相关性算分

3.2 function_score查询–人为修改相关性算分

3.2.1 相关性算分介绍

当我们利用match进行叶子查询,文档结果会根据与搜索词条的关联度打分_score),返回结果时按照分值降序排列

例如,我们搜索”手机”,结果如下:

image-20240521171322014

从Elasticsearch5.1开始,采用的相关性打分算法是BM25算法,其公式如下:

image-20240521171415660

基于这套公式,就可以判断出某个文档用户搜索的关键字之间的关联度,还是比较准确的。但是,在实际业务需求中,常常会有竞价排名的功能。不是相关度越高排名越靠前,而是掏的钱多的排名靠前。

例如在百度中搜索Java培训,排名靠前的就是广告推广

image-20240521171538540

要想人为控制相关性算法【添加一个过滤条件,增加一个算分函数得到一个值,然后和原始相关性算分运算一下得到新的】,就需要利用Elasticsearch的function_score查询

3.2.2 function_score介绍

function score 查询中包含四部分内容:

  • 1.原始查询条件:query部分,基于这个条件搜索文档,并且基于BM25算法给文档打分,原始算分(query score)
  • 2.过滤条件:filter部分,符合该条件的文档才会重新算分
  • 3.算分函数:符合filter条件的文档要根据这个函数做运算,得到的函数算分(function score),有四种函数
    • weight:函数结果=常量
    • field_value_factor:函数结果=文档中的某个字段值
    • random_score:函数结果=随机数
    • script_score:自定义算分函数算法
  • 4.运算模式:算分函数的结果、原始查询的相关性算分,两者之间的运算方式,包括:
    • multiply:相乘
    • replace:用function score原始算分替换query score
    • 其它,例如:sum、avg、max、min

==【大致就是在原有BM25算法得到相关性算分,然后根据符合filter条件的文档根据算分函数得到一个值,最后两者进行一些运算方式得到最终值】==

function score的运行流程如下:

  • 1)根据原始条件查询搜索文档,并且计算相关性算分,称为原始算分(query score)
  • 2)根据过滤条件,过滤文档
  • 3)符合过滤条件的文档,基于算分函数,得到函数算分(function score)–算分函数=①常量②文档中某字段值③随机数④自定义
  • 4)将原始算分(query score)和函数算分(function score)基于运算模式[各种]做运算,得到最终结果,作为相关性算分。

举例:给IPhone这个品牌的手机算分提高十倍,分析如下:

  • 过滤条件:品牌必须为IPhone
  • 算分函数:常量weight,值为10
  • 算分模式:相乘multiply

对应代码:

image-20240521172952514

4.排序和分页

1.默认排序:Elasticsearch支持对搜索结果根据相关度算分(_score)进行排序,按照分值降序排列。

2.指定字段排序:Elasticsearch支持对keyword类型,数值类型,地理坐标类型,日期类型等。

4.1 指定排序

基本语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"排序字段": {
"order": "排序方式asc和desc"
}
}
]
}

image-20240522160121221

4.2 分页

elasticsearch 【默认】只返回==top10数据==。而如果要查询更多数据就需要修改分页参数了

elasticsearch中通过修改fromsize参数来控制要返回的分页结果:

  • from:从第几个文档开始
  • size:总共查询几个文档

类似于mysql中的limit ?, ?

基本语法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET /items/_search
{
"query": {
"match_all": {}
},
"from": 0, // 分页开始的位置,默认为0
"size": 10, // 每页文档数量,默认10
"sort": [
{
"price": {
"order": "desc"
}
}
]
}

image-20240522160315919

elasticsearch的数据一般会采用分片存储,也就是把一个索引中的数据分成N份,存储到不同节点上。这种存储方式比较有利于数据扩展,但给分页带来了一些麻烦。

举例如下:

如果一个索引库有100000条数据,分别存储到4个分片中,每个分片有25000条数据。现在每页查询10条,查询第99页。那分页查询条件如下:

1
2
3
4
5
6
7
8
9
10
GET /items/_search
{
"from": 990, // 从第990条开始查询
"size": 10, // 每页查询10条
"sort": [
{
"price": "asc"
}
]
}

从语句分析来讲:要查询的是第990-1000名数据。

从实现思路分析来讲:①将所有数据排序,找出前1000名②取出其中990-1000的部分

这样来看操作很复杂,因为每个片的数据不是顺序存储的,只能所有拿到一起再重新排序,才能找到最终前1000名截取出990-1000数据

image-20240522160808390

因此,Elasticsearch对普通分页会有一个设置:(from+size)<10000

4.3 深度分页(解决普通分页)

针对深度分页,elasticsearch提供了两种解决方案:

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式。
  • scroll:原理将排序后的文档id形成快照,保存下来,基于快照做分页。官方已经不推荐使用。

以search after为例:

image-20240522161437320

优点:没有查询上限,支持深度分页【更智能,无上限】

缺点:只能向后逐页查询,不能随机翻页【一页一页查询】

场景:数据大规模顺序迁移、手机滚动查询【一页一页】

适用建议:

  • 大多数情况下,我们采用普通分页就可以了。查看百度、京东等网站,会发现其分页都有限制。例如百度最多支持77页,每页不足20条。京东最多100页,每页最多60条。

  • 因此,一般我们采用限制分页深度的方式即可,无需实现深度分页。

5.高亮显示

5.1 高亮原理

我们在百度,京东搜索时,关键字会变成红色,比较醒目,这叫高亮显示:

image-20240522162022778

css样式肯定是前端实现页面的时候写好的,但是前端编写页面的时候是不知道页面要展示什么数据的,不可能给数据加标签。而服务端实现搜索功能,要是有elasticsearch做分词搜索,是知道哪些词条需要高亮的。

因此词条的高亮标签肯定是由服务端提供数据的时候已经加上的

5.2 高亮操作

高亮的思路就是:

  • 用户输入搜索关键字搜索数据
  • 服务端根据搜索关键字到Elasticsearch搜索,并给搜索结果中的关键字词条添加html标签
  • 前端提前给约定好的html标签添加CSS样式

基本语法:

image-20240522162432699

注意:

  • 搜索必须有查询条件,而且是叶子查询的全文检索类型的查询条件(有分词),例如match
  • 参与高亮的字段必须是text类型的字段
  • 默认情况下参与高亮的字段要与搜索字段一致,除非添加:required_field_match=false

6.聚合

聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如:

  • 什么品牌的手机最受欢迎?
  • 这些手机的平均价格、最高价格、最低价格?
  • 这些手机每月的销售情况如何?

实现这些统计功能的比数据库的sql要方便的多,而且查询速度非常快,可以实现≈实时搜索效果。

聚合分类

image-20240523143246188

==【注意】:参与聚合的字段必须是Keyword、数值、日期、布尔的类型的字段(这些字段一般不分词)==

举例:

具体位置解释:

image-20240523145507262

==DSL手写规则总结==

image-20240522164945976

==方式二:通过Java实现—RestClient查询==

==0.总体对照分析==

查询数据

我们可以分三步拼凑DSL语句和发起请求获取相应结果:

image-20240522172046658

其中2.组织DSL参数的步骤中source()方法下面对应的查询/高亮/分页/排序/聚合:
image-20240522172832347

在查询方面我们直接可以通过QueryBuilders类调用对应的叶子查询/复杂查询

image-20240522172921305

解析数据

我们可以通过响应结果和Elasticsearch页面返回结果获取具体细节: 【可以扩展很多,但其实就是对照DSL查询结果写

image-20240522173851593

黑马的图:

image-20240522173920457

整体步骤

文档搜索的基本步骤是:

  1. 创建SearchRequest对象实例request
  2. 准备request.source(),也就是DSL语句【这个位置可以创建查询,分页,排序,聚合,高亮等操作】
    1. QueryBuilders来构建查询条件
    2. 传入request.source()query()方法
  3. 发送请求,得到结果
  4. 解析结果(参考DSL查询得到的JSON结果,从外到内,逐层解析)

1.查询

上述手动创建DSL查询的时候讲过查询的分类:
image-20240523135626365

1.1 叶子查询

1.1.1 全文检索

image-20240523135752868

1.1.2 精确查询

image-20240523135825592

1.2 复合查询

image-20240523135928266

1.3 举例

举例:

image-20240523141521261

具体代码如下:

image-20240523141510018

2.分页和排序

2.1 基础API

image-20240523142727445

3.高亮

  • 条件同样是在request.source()中指定,只不过高亮条件要基于HighlightBuilder来构造
  • 高亮响应结果与搜索的文档结果不在一起,需要单独解析

3.1 基础API

image-20240523142500909

3.2 获取高亮值

每一条hits信息原始数据在_source部分,而高亮部分在同级highlight内部:

image-20240523142428052

在整体代码的位置:

image-20240523142651360

代码解读:

  • 从结果中获取_sourcehit.getSourceAsString(),这部分是非高亮结果,json字符串。还需要反序列为ItemDoc对象
  • 获取高亮结果。hit.getHighlightFields(),返回值是一个Map,key是高亮字段名称,值是HighlightField对象,代表高亮值
  • Map中根据高亮字段名称,获取高亮字段值对象HighlightField
  • HighlightField中获取Fragments,并且转为字符串。这部分就是真正的高亮字符串了
  • 最后:用高亮的结果替换ItemDoc中的非高亮结果

4.聚合

我们以品牌聚合为例:

4.1 基础API

image-20240523150059473

在Java代码中位置:

image-20240523150224095

4.2 获取桶结果

image-20240523150857573

在Java代码中位置:

image-20240523150938582

==Elasticsearch学习总结==

==1.基本使用思路:==

1.创建索引库和映射 –有了类似于数据库的表和表定义

2.对文档进行CRUD –有了类似于数据库的一行行数据

3.在对应位置进行复杂的DSL查询 –我们可以进行高级的查询,分页,排序,高亮,聚合操作

==2.如果写Java代码的话:==

1.pom.xml导入RestHighLevelClient依赖,然后在父工程覆盖ES版本,初始化RestHighLevelClient依赖

image-20240523155157771

2.分析Mysql哪些字段需要搜索和必须存在,然后Elasticsearch在网页上进行手动设定索引库映射

image-20240523155251252

3.对索引库进行操作【不过我认为网页上更方便】 —到这一步类似于完成了数据库的表和表定义

image-20240523155356214

4.对文档操作【不过我认为网页上更方便】 —到这一步类似于有了数据库的数据

image-20240523155503652

5.在具体位置就可以进行复杂的DSL查询【可以进行查询,分页,排序,高亮,聚合等操作】

image-20240523155632499

RabbitMQ

1.同步调用和异步调用

1.1 同步调用

image-20240319100914998

综上,同步调用的方式存在下列问题:

  • 拓展性差(新增业务和逻辑就要修改,不符合开闭原则)
  • 性能下降
  • 级联失败

而要解决这些问题,我们就必须用异步调用的方式来代替同步调用

1.2 异步调用

异步调用方式其实就是==基于消息通知的方式==,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方
  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器
  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

image-20240319143703635

异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker(消息代理)。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理 —> 发送消息的人和接收消息的人就完全解耦了

如图所示:

image-20240319144436498

综上,异步调用的优势包括:

  • 耦合度更低
  • 性能更好
  • 业务拓展性强
  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

1.3 MQ技术选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ.
目比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP XMPP SMTP STOMP OpenWire STOMP REST XMPP AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka

据统计,目前国内消息队列使用最多的还是RabbitMQ,再加上其各方面都比较均衡,稳定性也好

2.RabbitMQ

image-20240319093425166

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:Messaging that just works — RabbitMQ

2.1 RabbitMQ安装

基于Docker来安装RabbitMQ,命令如下:

1
2
3
4
5
6
7
8
9
10
11
docker run 
-e RABBITMQ_DEFAULT_USER=larkkkkkkk #设置默认用户名
-e RABBITMQ_DEFAULT_PASS=123456 #设置默认密码
-v mq-plugins:/plugins #将本地主机上的mq-plugins目录挂载到容器内部的/plugins目录,可以存放插件
--name mq #指定容器名
--hostname mq #指定容器的主机名
-p 15672:15672 #RabbitMQ管理页面登录的端口号 [浏览器输入http://localhost:15672/即可进入]
-p 5672:5672 #RabbitMQ用于AMQP协议通信 [SpringAMQP配置时候用]
--network heima #将容器连接到名字为heima的网络中 [如果没有就使用命令创建hmall网络 docker network create heima]
-d #在后台运行容器
rabbitmq:3.8-management #使用RabbitMQ 3.8版本带有管理界面的镜像来创建容器

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口

通过访问 http://localhost:15672即可看到管理控制台。首次访问登录,需要配置文件中设定的用户名和密码

image-20240319192803935

2.2 RabbitMQ架构

image-20240319193424489

其中包含几个概念:

  • **publisher**:生产者,也就是发送消息的一方
  • **consumer**:消费者,也就是消费消息的一方
  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

上述这些东西都可以在RabbitMQ的管理控制台来管理

3.SpringAMQP

RabbitMQ采用AMQP协议,因此具有跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互【RabbitMQ官方提供了各种不同语言的客户端】

但是RabbitMQ官方提供的Java客户端编码复杂,一般生产环境下我们更多会结合Spring来使用,Spring提供模板工具和SpringBoot自动装配 –> ==SpringAMQP==

提供三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息 -rabbitTemplate.convertAndSend(队列名,发送信息);

3.1 生产者-消费者(1-1)

3.1.1 导入Demo工程

image-20240319195614981

3.1.2 导入maven坐标

image-20240319195708883

3.1.3 新建队列

image-20240319200226308

3.1.4 每个子工程配置RabbitMQ信息

image-20240319200405081

3.1.5 生产者发送消息

1
2
1.注入RabbitTemplate对象
2.对象调用convertAndSend(队列名,信息)方法
image-20240319200946446

3.1.6 消费者接收消息

1
2
3
1.使用注解RabbitListener(队列名=“xxx”)
2.启动当前消费者子工程(SpringBoot工程)
3.生产者发送一次消息,消费者就会接收到一次消息

image-20240319201418364

3.2 生产者-消费者(1-n) -WorkQueues任务模型

Work queues任务模型 –> 让多个消费者绑定到一个队列,共同消费队列中的消息 –> ==解决消息堆积太多==

image-20240319202054225

3.2.1 新建队列

image-20240320091336394

3.2.2 生产者发送消息

一个发送者,循环发送50次消息

image-20240320091703762

3.2.3 消费者接收消息

两个消费者接收消息,一个休眠20ms(每秒钟处理50个消息),一个休眠200ms(每秒钟处理5个消息)

image-20240320092251196

3.2.4 均匀分配

启动消费者子工程项目,再发送消息就可以接受消息:

image-20240320092647584

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息
  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是==平均分配==给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的

3.2.5 能者多劳(yml配置prefetch)

修改==listener.simple.prefetch:1==可以保证==能者多劳==,每个消费者每次只能获取一条消息,处理完成才能获取下一条消息

image-20240320093106144

更改之后重新发送消息:

image-20240320093225443

可以发现,由于消费者1处理速度较快,所以处理了更多的消息;消费者2处理速度较慢,只处理了6条消息。而最终总的执行耗时也在1秒左右,大大提升。
正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题

4.交换机(Exchange)

在3.1和3.2部分没有添加交换机,生产者直接发送消息到队列。但是引入交换机之后消息发送的模式会有很大的变化:

image-20240320093715426

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息(递交给某个特别队列、递交给所有队列、或是将消息丢弃)
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,【没有变化】

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Exchange(交换机)的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。【最早在控制台使用】
  • Direct:订阅,基于RoutingKey(写死的路由key)发送给订阅了消息的队列
  • Topic:通配符订阅,基于RoutingKey(符合通配符的路由key)发送给订阅了消息的队列
  • Headers:头匹配,基于MQ的消息头匹配,【用的较少】

4.1 Fanout交换机(广播)

在广播(Fanout)模式下,消息发送流程是这样的

image-20240320094830771

4.1.1 声明交换机和队列

image-20240320095902006

4.1.2 消息发送

image-20240320095942536

4.1.3 消息接收

image-20240320100056347

启动消费者子工程之后发送消息

image-20240320100134844

4.2 Direct交换机(订阅)

4.2.1 声明交换机和队列

image-20240320200427090

官网在线创建:

image-20240320200530874

4.2.2 消息发送

image-20240320200725027

4.2.3 消息接收

image-20240320200746903

4.3 Topic交换机(通配符订阅)

4.3.1 声明交换机和队列

image-20240320200956679

官网在线创建:

image-20240622214037809

4.3.2 消息发送

image-20240320200802564

4.3.3 消息接收

image-20240320200816573

4.3.4 总结

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用==通配符==!

BindingKey一般是一个/多个单词组成,多个单词之间用.分割

通配符规则:

通配符规则:

  • #:匹配0个或者多个词
  • *:匹配1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu 或者 item (#可以是0个词到多个词)
  • item.*:只能匹配item.spu (*只能是一个词)

5. API-队列和交换机(替换网页手动创建)

SpringAMQP提供了声明队列,交换机和绑定关系的API:

  • Queue:队列
  • Exchange:交换机
  • Binding:绑定关系

image-20240320203603754

5.1 @Bean方式声明(不推荐)

image-20240320215605011

这样创建很繁琐,因此提供了基于注解的方式

5.2 注解方式声明(推荐)

image-20240320220328866

其实就是@RabbitListener注解里面配置关系(@QueueBinding),然后里面具体的就是交换机(@Exchange),队列(@Queue)以及路由key(key)

image-20240320221056179

6.消息转换器[解决发送消息的JDK序列化]

Spring的convertAndSend()方法接收的是一个Object类型:

image-20240320221749744

而在数据传输时,可能会因为默认的==JDK序列化==导致数据体积过大(乱码一样的序列化结果),安全漏洞,可读性差等问题。

image-20240320222347892

因此可以考虑使用==Json序列化和反序列化==:

6.1 配置JSON转换器

  • 1.在生产者和消费者两个服务中都要引入依赖

image-20240321092511719

注意:如果项目中引入了Spring-boot-starter-web依赖,则无需再次引入Jackson依赖

  • 2.在生产者和消费者两个服务的启动类中添加一个Bean:配置消息转换器

    image-20240321092839447

==7.RabbitMQ使用总结==【直接看这里写代码】

==可以参考<RabbitMQ-黑马商城为例>这篇文章,有详细的操作介绍和步骤==

7.1 maven引入maven坐标

在生产者和消费者的pom.xml文件中配置:

1
2
3
4
5
<!--消息发送-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

7.2 子工程配置RabbitMQ信息

在生产者和消费者的application.yml文件中配置:

1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: 192.168.92.129 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123456 # 密码

//消费者和生产者会在对应位置添加配置 【例如:生产者消费者的确认机制,重试机制等】

7.3 配置消息转换器[解决发送消息的JDK序列化]

可以在公共模块添加bean

image-20240321095355698

7.4 生产者-发送消息

将原始的同步修改订单信息更改为异步修改

image-20240321095847784

7.5 消费者-接收消息

消费者可以添加消息监听,添加好交换机和路由key和队列

image-20240321100301473

==总结如下:==

image-20240403110457790

大致就是导入maven,子工程配置一些属性,然后生产者调用rabbitMQ的rabbitTemplate.convertAndSend()方法发送消息【里面可以添加各种】,在消费者方面可以①使用bean进行声明交换机,队列和关系②使用@RabbitListener注解进行声明【里面可以添加一些属性,例如持久化的,lazyqueue的,延迟消息的】


==高级进阶 –保证消息可靠性(三个方面)==

异步结构可能会在发送者,MQ,消费者三个地方出现问题!!!因此要考虑这三个位置的可靠性和兜底方案(延迟消息)

消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:

image-20240321120455090

消息从生产者到消费者的每一步都可能导致消息丢失:

  • 发送消息时丢失:
    • 生产者发送消息时连接MQ失败
    • 生产者发送消息到达MQ后未找到Exchange
    • 生产者发送消息到达MQ的Exchange后,未找到合适的Queue
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失:
    • 消息到达MQ,保存到队列后,尚未消费就突然宕机
  • 消费者处理消息时:
    • 消息接收后尚未处理突然宕机
    • 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ —> ==生产者的可靠性(生产者重试机制,生产者确认机制)==

  • 确保MQ不会将消息弄丢 —> ==MQ的可靠性(数据持久化,lazy queue)==

  • 确保消费者一定要处理消息 —> ==消费者的可靠性(消费者确认机制,失败重传机制,失败处理策略,业务幂等性)==

    ==总汇总(复习图)==

image-20240325153049616

8.可靠性——-发送者

8.1 生产者重试机制(建议禁用)

生产者发送消息时,出现网络故障,导致与MQ连接中断 ———-> SpringAMQP提供的消息发送时的==重试机制==

image-20240321110716739

注意:
当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是==阻塞式重试==(也就是说多次重试等待的过程中,当前线程是被阻塞的)

如果对业务性能有要求的,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

8.2 生产者确认机制(默认不开启)

一般情况下,只要生产者与MQ之间的网络连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

  • MQ内部处理消息的进程发生了异常
  • 生产者发送消息到达MQ后未找到交换机
  • 生产者发送消息到达MQ的交换机之后,未找到合适的队列,因此无法路由

针对上述三种情况,RabbitMQ提供了==生产者消息确认机制==,包括了Publisher ConfirmPublisher Return两种方式。

在开启确认机制的情况下,当生产者发送消息给MQ之后,MQ会根据消息处理的情况返回不同的回执:

image-20240321160450585

总结如下:

  • 1.只要消息投递到MQ,就返回ACK,告知投递成功(基本上这三种ack我们可以考虑不处理,直接只关注nack的情况)

​ 1.1 当消息投递到MQ,但是路由失败时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功

​ 1.2 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功

​ 1.3 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功

  • 2.其它情况都会返回NACK,告知投递失败

【其中ack(投递成功)和nack(投递失败)都属于Publisher Confirm机制;return是属于Publisher Return机制】

【(默认)两种机制都是关闭状态,需要通过配置文件来开启,因为是需要额外的网络和系统资源开销】

【一定要使用的话,无需开启Publisher-Return机制(一般路由失败是自己业务问题)】

8.2.1 配置文件添加

image-20240321161135899

这里publisher-confirm-type有三种模式可选:

  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

一般我们推荐使用correlated,回调机制。

8.2.2 定义ReturnCallback(返回信息)

每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置

我们在publisher模块定义一个配置类:rabbitTemplate对象调用setReturnsCallback()方法,方法参数是一个匿名内部类(重写returnedMessage方法)

image-20240321161749402

8.2.3 定义ConfirmCallback(确定ack/nack)

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发送消息时定义。

就是在发送消息时,调用RabbitTemplate.convertAndSend()时多传递一个参数:

image-20240321162745583

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象

将来MQ的回执就会通过这个Future来返回,我们可以提前给CorrelationData中的Future添加回调函数来处理消息回执:

image-20240321194022895

发送者位置发送消息(新增字段为了获取MQ给的结果):

image-20240321194656371

注意
开启生产者确认比较消耗MQ性能,一般不建议开启。而且大家思考一下触发确认的几种情况:

  • 路由失败:一般是因为RoutingKey错误导致,往往是编程导致
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。因此只有对消息可靠性要求非常高的业务才需要开启,而且仅仅需要开启ConfirmCallback处理nack就可以

9.可靠性——–RabbitMQ

在默认情况下,RabbitMQ会将接收到的信息保存在==内存==中(降低消息收发延迟)。这样会导致两个问题:

  • 一旦RabbitMQ宕机,内存中的消息会丢失 –> ==交换机持久化,队列持久化,消息持久化==
  • 内存空间有限,当消费者故障或者处理过慢,会导致消息积压,引发RabbitMQ阻塞 –> ==Lazy Queue==

image-20240321200321687

9.1 三种持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置数据持久化,包括:

  • 1.交换机持久化
  • 2.队列持久化
  • 3.消息持久化

其中以控制台界面为例:

  • 1.交换机持久化:

image-20240321202630567

  • 2.队列持久化:

image-20240321202726506

  • 3.消息持久化:

image-20240321202757851

说明:

在开启持久化机制以后,如果同时还开启了生产者确认机制,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性

不过出于性能考虑,为了减少IO次数,发送到RabbitMQ的消息是每隔一段时间(100ms左右)批量持久化,这会导致后续的ACK回执有一定的延迟,因此建议生产者确认全部采用异步方式

9.2 LazyQueue惰性队列

在默认情况下,RabbitMQ会将接收到的信息保存在内存中以降低消息收发的延迟。但在某些特殊情况下,这会导致消息积压,比如:

  • 消费者宕机或出现网络故障(后续崩了)
  • 消息发送量激增,超过了消费者处理速度(前面发的太快了,后面接不住)
  • 消费者处理业务发生阻塞(后续阻塞)

一旦出现消息堆积问题,RabbitMQ的内存占用会越来越高 —> 触发内存预警上限,此时RabbitMQ会将内存消息 –刷–> 磁盘,这个行为叫==PageOut==,PageOut会耗费一段时间,并且会阻塞队列进程。因此在这个过程中RabbitMQ不会再处理新的消息,生产者的所有请求都会被阻塞

为了解决这个问题,从3.6.0版本开始,增加了Lazy Queues(惰性队列)。惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存 (直接存磁盘,就不会刷盘造成阻塞队列进程)
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载,需要了我才加载内存)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为了所有队列的默认格式。因此官方推荐升级RabbitMQ为3.12版本/所有队列都设置为LazyQueue模式

9.2.1方式一— 控制台配置Lazy模式

image-20240321204855176

9.2.2 方式二—代码配置Lazy模式

基本原理都是设置属性:==x-queue-mode=lazy==

  • 基于@Bean注解(配置类)

image-20240321204954868

QueueBuilder底层源码为:

image-20240321205034258

  • 基于@RabbitListener注解(消费者子工程)

image-20240321205132950

9.2.3 更新已有队列为Lazy模式

  • 基于控制台:
image-20240321205402456
  • 基于命令行:

可以基于命令行设置policy:

1
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl :RabbitMQ的命令行工具
  • set_policy :添加一个策略
  • Lazy :策略名称,可以自定义
  • "^lazy-queue$" :用正则表达式匹配队列的名字
  • '{"queue-mode":"lazy"}' :设置队列模式为lazy模式
  • --apply-to queues:策略的作用对象,是所有的队列

10.可靠性——-消费者

10.1 消费者确认机制

为了确定消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)

就是说当消费者处理消息结束后,应该向+RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。这时候回执有三种可选值:

  • ack成功处理消息,RabbitMQ从队列中删除该消息
  • nack消息处理失败,RabbitMQ再次投递消息
  • reject(很少使用)消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般第三种reject方式比较少,除非是消息格式问题(那就是开发问题),因此大多数情况下我们需要将消息处理的代码通过try-catch机制捕获,消息处理成功就返回ack,处理失败就返回nack。

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并且允许我们通过配置文件(yml)设置ACK处理方式,有三种模式:

  • none:不处理。就是将消息投递给消费者后立刻回调ack,消息会立刻从MQ中删除。【非常不安全,不建议使用】
  • manual:手动模式。需要自己在业务代码中调用api,回调发送ack/reject【存在业务入侵,但更灵活】
  • auto:自动模式。SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强。
    • 当业务正常执行时则自动返回ack(RabbitMQ删除消息)
    • 当业务出现异常时根据异常判断返回不同的结果

​ - 如果是业务异常,自动返回nack(RabbitMQ再次投递消息) —> 可能会出现不停重复投递(导致消息堆积)

​ - 如果是消息处理/校验异常,自动返回reject(RabbitMQ删除消息)

配置消费者的xml文件可以修改SpringAMQP的ack处理方式:

image-20240401141236210

但是,如果auto模式下是业务异常回执给nack,那就会不断从MQ中投递消息,会导致MQ消息处理飙升带来不必要的压力(这种极端情况就是消费者一直无法执行成功,发生概率很低,但是不怕万一就怕一万)

10.2 消费者失败重试机制

因为10.1如果是收到nack回执,那么就会不断从MQ中投递消息,可能会导致消息堆积,导致mq的消息处理飙升,带来不必要的压力

我们可以利用Spring的retry机制—>==当消费者异常就利用本地重试(×无限制重试)==

配置消费者的xml文件可以修改SpringAMQP的本地重试机制:

image-20240401154838741

在开启重试机制后,重试次数耗尽之后,如果消息依然失效,则会默认直接丢弃消息!!!!!!!!!!!!!

image-20240401155907410

可以发现:

  • 消费者在失败后消息并没有重新回到MQ无限重新投递,而是重试3次

  • 本地重试3次之后,抛出了AmqpRejectAndDontRequeueException异常(说明直接reject丢弃了)

10.3 失败处理策略[解决10.2重试后reject丢弃消息情况]

image-20240401163030885

因为10.2本地重试之后如果消息失效就直接丢弃,因此我们可以考虑==加上自定义重试次数之后的策略==。只需要==MessageRecoverer接口==来处理,它包含了三种不同的实现:

  • RejectAndDontRequeueRecoverer(默认):重试耗尽后,直接reject,丢弃消息
  • *ImmediateRequeueMessageRecoverer *:重试耗尽后,返回nack,消息重新入队 【减缓重试的速度,就还是要重新投递到前一步】
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(最后人工校验/特殊校验)

10.3.1 第三种策略为例

①定义接受失败消息的交换机,队列和绑定关系

②定义RepublishMessageRecoverer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.itheima.consumer.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
//当消费者重试机制属性enabled=true的时候生效
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
//交换机
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
//队列
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
//交换机和队列绑定
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
//第三种!!!! 返回一个MessageRecoverer类型
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}

③在10.2基础上会将消息直接传递到对应交换机上最后进行人工处理

image-20240401163012833

  • 总结:

    自定义的三种方式,第一种默认直接拒绝(不好),第二种相当于往前重试了一个环节(减缓了重试速度,但是也不好),第三种相当于交给专门队列和交换机,最后交给人工处理(也不是很好)

10.4 业务幂等性[解决10.2多次重新投递消息情况]

  • 幂等性

    用函数表达式来描述是这样的:f(x) = f(f(x)),例如求绝对值函数。

    在程序开发中,则是指==同一个业务,执行一次或多次对业务状态的影响是一致的==。例如:

    • 根据id删除数据
    • 查询数据
    • 新增数据

    但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

    • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
    • 退款业务。重复退款对商家而言会有经济损失。

    所以,我们要尽可能避免业务被重复执行。然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况,例如:

    • 页面卡顿时频繁刷新导致表单重复提交
    • 服务间调用的重试
    • MQ消息的重复投递

    我们在用户支付成功后会发送MQ消息到交易服务,修改订单状态为已支付,就可能出现消息回

    复投递的情况。如果消费者不做判断,很有可能导致消息被消费多次,出现业务故障。

    举例:

    1. 假如用户刚刚支付完成,并且投递消息到交易服务,交易服务更改订单为已支付状态。
    2. 由于某种原因,例如网络故障导致生产者没有得到确认,隔了一段时间后重新投递给交易服务。
    3. 但是,在新投递的消息被消费之前,用户选择了退款,将订单状态改为了已退款状态。
    4. 退款完成后,新投递的消息才被消费,那么订单状态会被再次改为已支付。业务异常。

    因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

    • 唯一消息ID
    • 业务状态判断

10.4.1 唯一消息ID(存在业务侵入)

这个思路非常简单:

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
  • 1.SpringAMQP的MeesageConverter自带MessageID的功能
1
2
3
4
5
6
7
8
9
10
11
//以Jackson的消息转换器为例:
//在消费者和生产者的config配置里面添加
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);
// 3.返回jjmc
return jjmc;
}

我们在生产者位置添加:

image-20240402102220873

打开源码可以看到,生成随机id的底层源码:

image-20240402095327684

最终发送一条消息如下:

image-20240402102055345

  • 2.使用Redis缓存(tk使用)

    在调用接口的时候+调用生成随机数的接口生成id(全局唯一)两者合二为一,然后判断是否第一次调用,第一次调用的话业务处理完成之后将{key:id,value:操作结果}+过期时间存入redis数据库;之后每次进行的时候判断是否key存在,存在的话说明重复提交返回错误

10.4.2 业务状态判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

相比较而言,消息ID的方案需要改造原有的数据库(会存在业务侵入问题),所以我更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改OrderServiceImpl中的markOrderPaySuccess方法:处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行业务时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//在原有基础上添加判断订单状态 ---如果不符合直接消息reject了!!!!!!!

@Override
public void markOrderPaySuccess(Long orderId) {
// 1.查询订单
Order old = getById(orderId);
// 2.判断订单状态
if (old == null || old.getStatus() != 1) {
// 订单不存在或者订单状态不是1,放弃处理
return;
}
// 3.尝试更新订单
Order order = new Order();
order.setId(orderId);
order.setStatus(2);
order.setPayTime(LocalDateTime.now());
updateById(order);
}

根据上述代码逻辑可以完成幂等判断需求,但是由于判断和更新是两步动作,可能会在极小概率下可能存在线程安全问题 –> 可以考虑使用==乐观锁(CAS机制)==

1
2
3
4
5
6
7
8
9
10
11
12
//可以将上述三步直接合并为一条sql语句
@Override
public void markOrderPaySuccess(Long orderId) {
// UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
//mybatisplus的方式
lambdaUpdate()
.set(Order::getStatus, 2)
.set(Order::getPayTime, LocalDateTime.now())
.eq(Order::getId, orderId)
.eq(Order::getStatus, 1)
.update();
}

注意看,上述代码等同于这样的SQL语句:

1
UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

10.5 兜底方案[消费者定时主动询问]

上述机制可能增加了消息的可靠性,但是也不好说能保证消息100%的可靠。

其实思想很简单:既然MQ通知不一定发送到交易服务(消费者),那么交易服务(消费者)就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致

image-20240623151453867

图中黄色线圈起来的部分就是MQ通知失败后的兜底处理方案,由交易服务自己主动去查询支付状态。

什么时候去查询是无法确定的,因此我们通常采用的措施是利用定时任务(例如:SpringTask框架)定期查询。

==可靠性总结图==

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

image-20240402164059822

==高级进阶 –延迟消息(两种方式)==

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

因此,电商中通常的做法就是:对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

11.延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而在指定时间之后才收到消息

延迟任务:设置一定时间之后才执行的任务,(最简单的方案就是利用MQ的延迟消息)

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

11.1 死信交换机和延迟消息

11.1.1 死信交换机

死信(dead letter)

  • 消费者使用basic.rejectbasic.nack声明消费失败,并且消息的requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息满了,无法投递

如果队列设置属性dead-letter-exchange指定交换机 –>该队列的死信就会投递到这个交换机。

这个交换机就叫做死信交换机(Dead letter Exchange,简称DLX)

死信交换机的作用

  1. 收集那些因处理失败而被拒绝的消息

  2. 收集那些因队列满了而被拒绝的消息

  3. 收集因TTL(有效期)到期的消息

11.1.2 延迟消息

总结来说:宏观上看到就是内部做了一个延迟一样

image-20240402113915144

进一步说:变相的让发送消息到消费多了5s

image-20240402114329589

11.2 延迟消息插件(DelayExchange)

RabbitMQ官方提供一款插件,==原生支持延迟消息功能==。

插件原理就是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以存放一定时间,到期后再投递到队列。

11.2.1 下载

插件下载地址:
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

由于我们安装的MQ是3.8版本,因此这里下载3.8.17版本:

image-20240402133333466

11.2.2 安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷

1
docker volume inspect mq-plugins

结果如下:

image-20240402133421439

插件目录被挂载到了/var/lib/docker/volumes/mq-plugins/_data这个目录,我们上传插件到该目录下。

接下来执行命令,安装插件:

1
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:

image-20240402134224233

11.2.3 声明延迟交换机

  • 方式一:基于注解方式

image-20240402134800279

  • 方式二:基于@Bean方式

    image-20240402135010167

11.2.4 发送延迟消息

发送消息时:只需要通过设定x-delay属性设定延迟时间:

image-20240402135204953

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。
因此,不建议设置延迟时间过长的延迟消息

12. 实际操作(日后补充)

==可以参考<RabbitMQ-黑马商城为例>这篇文章,有详细的操作介绍和步骤==

,