首页
书籍
统计
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
548 阅读
2
Docker搭建Typecho博客
483 阅读
3
keytool证书导入
333 阅读
4
JAVA IO
326 阅读
5
SpringBoot整合SpringCache
307 阅读
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
锁
mysql
linux
redis
源码
typecho
centos
map
lambda
stream
nginx
software
少年
累计撰写
178
篇文章
累计收到
9
条评论
首页
栏目
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
页面
书籍
统计
友链
搜索到
14
篇与
分布式
的结果
2022-04-14
MyCat的安装及使用
MyCat的安装及使用一、MyCat的安装1、环境准备 本次课程使用的虚拟机环境是centos6.5,首先准备四台虚拟机,安装好mysql,方便后续做读写分离和主从复制。192.168.85.111 node01 192.168.85.112 node02 192.168.85.113 node03 192.168.85.114 node04 安装jdk 使用rpm的方式直接安装jdk,配置好具体的环境变量2、安装 从官网下载需要的安装包,并且上传到具体的虚拟机中,我们在使用的时候将包上传到node01这台虚拟机,由node01充当MyCat。 下载地址为:http://dl.mycat.org.cn/1.6.7.5/2020-4-10/解压文件到/usr/local文件夹下tar -zxvf Mycat-server-1.6.7.5-release-20200422133810-linux.tar.gz -C /usr/local配置环境变量vi /etc/profile 添加如下配置信息: export MYCAT_HOME=/usr/local/mycat export PATH=$MYCAT_HOME/bin:$PATH:$JAVA_HOME/bin当执行到这步的时候,其实就可以启动了,但是为了能正确显示出效果,最好修改下MyCat的具体配置,让我们能够正常进行访问。3、配置MyCat 进入到/usr/local/mycat/conf目录下,修改该文件夹下的配置文件1、修改server.xml文件<?xml version="1.0" encoding="UTF-8"?> <!-- - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. --> <!DOCTYPE mycat:server SYSTEM "server.dtd"> <mycat:server xmlns:mycat="http://io.mycat/"> <user name="root" defaultAccount="true"> <property name="password">123456</property> <property name="schemas">TESTDB</property> <property name="defaultSchema">TESTDB</property> </user> </mycat:server>2、修改schema.xml文件<?xml version="1.0"?> <!DOCTYPE mycat:schema SYSTEM "schema.dtd"> <mycat:schema xmlns:mycat="http://io.mycat/"> <schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="dn1"> </schema> <dataNode name="dn1" dataHost="host1" database="msb" /> <dataHost name="host1" maxCon="1000" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100"> <heartbeat>select user()</heartbeat> <writeHost host="hostM1" url="192.168.85.111:3306" user="root" password="123456"> <readHost host="hostS1" url="192.168.85.112:3306" user="root" password="123456"></readHost> </writeHost> </dataHost> </mycat:schema>4、启动mycat mycat的启动有两种方式,一种是控制台启动,一种是后台启动,在初学的时候建议大家使用控制台启动的方式,当配置文件写错之后,可以方便的看到错误,及时修改,但是在生产环境中,使用后台启动的方式比较稳妥。 控制台启动:去mycat/bin目录下执行 ./mycat console 后台启动:去mycat/bin目录下执行 ./mycat start 按照如上配置在安装的时候应该不会报错,如果出现错误,根据错误的提示解决即可。5、登录验证 管理窗口的登录 从另外的虚拟机去登录访问当前MyCat,输入如下命令即可mysql -uroot -p123456 -P 9066 -h 192.168.85.111 此时访问的是MyCat的管理窗口,可以通过show @@help查看可以执行的命令 数据窗口的登录 从另外的虚拟机去登录访问MyCat,输入命令如下:mysql -uroot -p123456 -P8066 -h 192.168.85.111 当都能够成功的时候以为着mycat已经搭建完成。二、读写分离 通过MyCat和MySQL的主从复制配合搭建数据库的读写分离,可以实现MySQL的高可用性,下面我们来搭建MySQl的读写分离。1、一主一从(主从复制的原理之前讲解过了,需要的同学自行参阅文档)1、在node01上修改/etc/my.cnf的文件#mysql服务唯一id,不同的mysql服务必须拥有全局唯一的id server-id=1 #启动二进制日期 log-bin=mysql-bin #设置不要复制的数据库 binlog-ignore-db=mysql binlog-ignore-db=information-schema #设置需要复制的数据库 binlog-do-db=msb #设置binlog的格式 binlog_format=statement2、在node02上修改/etc/my.cnf文件#服务器唯一id server-id=2 #启动中继日志 relay-log=mysql-relay3、重新启动mysql服务4、在node01上创建账户并授权slavegrant replication slave on *.* to 'root'@'%' identified by '123456'; --在进行授权的时候,如果提示密码的问题,把密码验证取消 set global validate_password_policy=0; set global validate_password_length=1;5、查看master的状态show master status6、在node02上配置需要复制的主机CHANGE MASTER TO MASTER_HOST='192.168.85.111',MASTER_USER='root',MASTER_PASSWORD='123456',MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=437;7、启动从服务器复制功能start slave;8、查看从服务器状态show slave status\G 当执行完成之后,会看到两个关键的属性Slave_IO_Running,Slave_SQL_Running,当这两个属性都是yes的时候,表示主从复制已经准备好了,可以进行具体的操作了2、一主一从验证 下面我们通过实际的操作来验证主从复制是否完成。--在node01上创建数据库 create database msb; --在node01上创建具体的表 create table mytbl(id int,name varchar(20)); --在node01上插入数据 insert into mytbl values(1,'zhangsan'); --在node02上验证发现数据已经同步成功,表示主从复制完成 通过MyCat实现读写分离 在node01上插入如下sql语句,-- 把主机名插入数据库中 insert into mytbl values(2,@@hostname); -- 然后通过mycat进行数据的访问,这个时候大家发现无论怎么查询数据,最终返回的都是node01的数据,为什么呢? select * from mytbl; 在之前的mycat基本配置中,其实我们已经配置了读写分离,大家还记得readHost和writeHost两个标签吗?<writeHost host="hostM1" url="192.168.85.111:3306" user="root" password="123456"> <readHost host="hostS1" url="192.168.85.112:3306" user="root" password="123456"></readHost> </writeHost> 其实我们已经配置过了这两个标签,默认情况下node01是用来完成写入操作的,node02是用来完成读取操作的,但是刚刚通过我们的验证发现所有的读取都是node01完成的,这是什么原因呢? 原因很简单,就是因为我们在进行配置的时候在 dataHost 标签中缺失了一个非常重要的属性balance,此属性有四个值,用来做负载均衡的,下面我们来详细介绍 1、balance=0 :不开启读写分离机制,所有读操作都发送到当前可用的writehost上 2、balance=1:全部的readhost和stand by writehost参与select 语句的负载均衡,简单的说,当双主双从模式下,其他的节点都参与select语句的负载均衡 3、balance=2:所有读操作都随机的在writehost,readhost上分发 4、balance=3:所有读请求随机的分发到readhost执行,writehost不负担读压力 当了解了这个参数的含义之后,我们可以将此参数设置为2,就能够看到在两个主机上切换执行了。3、双主双从 在上述的一主一从的架构设计中,很容易出现单点的问题,所以我们要想让生产环境中的配置足够稳定,可以配置双主双从,解决单点的问题。 架构图如下所示: 在此架构中,可以让一台主机用来处理所有写请求,此时,它的从机和备机,以及备机的从机复制所有读请求,当主机宕机之后,另一台主机负责写请求,两台主机互为备机。 主机分布如下:编号角色ip主机名1master1192.168.85.111node012slave1192.168.85.112node023master2192.168.85.113node034slave2192.168.85.114node04 下面开始搭建双主双从。 1、修改node01上的/etc/my.cnf文件#主服务器唯一ID server-id=1 #启用二进制日志 log-bin=mysql-bin # 设置不要复制的数据库(可设置多个) binlog-ignore-db=mysql binlog-ignore-db=information_schema #设置需要复制的数据库 binlog-do-db=msb #设置logbin格式 binlog_format=STATEMENT # 在作为从数据库的时候, 有写入操作也要更新二进制日志文件 log-slave-updates #表示自增长字段每次递增的量,指自增字段的起始值,其默认值是1, 取值范围是1 .. 65535 auto-increment-increment=2 # 表示自增长字段从哪个数开始,指字段一次递增多少,他的取值范围是1 .. 65535 auto-increment-offset=1 2、修改node03上的/etc/my.cnf文件#主服务器唯一ID server-id=3 #启用二进制日志 log-bin=mysql-bin # 设置不要复制的数据库(可设置多个) binlog-ignore-db=mysql binlog-ignore-db=information_schema #设置需要复制的数据库 binlog-do-db=msb #设置logbin格式 binlog_format=STATEMENT # 在作为从数据库的时候,有写入操作也要更新二进制日志文件 log-slave-updates #表示自增长字段每次递增的量,指自增字段的起始值,其默认值是1,取值范围是1 .. 65535 auto-increment-increment=2 # 表示自增长字段从哪个数开始,指字段一次递增多少,他的取值范围是1 .. 65535 auto-increment-offset=2 3、修改node02上的/etc/my.cnf文件#从服务器唯一ID server-id=2 #启用中继日志 relay-log=mysql-relay 4、修改node04上的/etc/my.cnf文件#从服务器唯一ID server-id=4 #启用中继日志 relay-log=mysql-relay 5、所有主机重新启动mysql服务 6、在两台主机node01,node03上授权同步命令GRANT REPLICATION SLAVE ON *.* TO 'root'@'%' IDENTIFIED BY '123456'; 7、查看两台主机的状态show master status; 8、在node02上执行要复制的主机CHANGE MASTER TO MASTER_HOST='192.168.85.111',MASTER_USER='root',MASTER_PASSWORD='123456',MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=154; 9、在node04上执行要复制的主机CHANGE MASTER TO MASTER_HOST='192.168.85.113',MASTER_USER='root',MASTER_PASSWORD='123456',MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=154; 10、启动两个从机的slave并且查看状态,当看到两个参数都是yes的时候表示成功start slave; show slave status; 11、完成node01跟node03的相互复制--在node01上执行 CHANGE MASTER TO MASTER_HOST='192.168.85.113',MASTER_USER='root',MASTER_PASSWORD='123456',MASTER_LOG_FILE='mysql-bin.000001',MASTER_LOG_POS=442; --开启slave start slave --查看状态 show slave status\G --在node03上执行 CHANGE MASTER TO MASTER_HOST='192.168.85.111',MASTER_USER='root',MASTER_PASSWORD='123456',MASTER_LOG_FILE='mysql-bin.000002',MASTER_LOG_POS=442; --开启slave start slave --查看状态 show slave status\G4、双主双从验证在node01上执行如下sql语句:create database msb; create table mytbl(id int,name varchar(20)); insert into mytbl values(1,'zhangsan'); --完成上述命令之后可以去其他机器验证是否同步成功 当上述操作完成之后,我们可以验证mycat的读写分离,此时我们需要进行重新的配置,修改schema.xml文件。 在当前mysql架构中,我们使用的是双主双从的架构,因此可以将balance设置为1 除此之外我们需要注意,还需要了解一些参数: 参数writeType,表示写操作发送到哪台机器,此参数有两个值可以进行设置: writeType=0:所有写操作都发送到配置的第一个writeHost,第一个挂了切换到还生存的第二个 writeType=1:所有写操作都随机的发送到配置的writehost中,1.5之后废弃, 需要注意的是:writehost重新启动之后以切换后的为准,切换记录在配置文件dnindex.properties中 参数switchType:表示如何进行切换: switchType=1:默认值,自动切换 switchType=-1:表示不自动切换 switchType=2:基于mysql主从同步的状态决定是否切换<?xml version="1.0"?> <!DOCTYPE mycat:schema SYSTEM "schema.dtd"> <mycat:schema xmlns:mycat="http://io.mycat/"> <schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="dn1"> </schema> <dataNode name="dn1" dataHost="host1" database="msb" /> <dataHost name="host1" maxCon="1000" minCon="10" balance="1" writeType="0" dbType="mysql" dbDriver="native" switchType="1" slaveThreshold="100"> <heartbeat>select user()</heartbeat> <writeHost host="hostM1" url="192.168.85.111:3306" user="root" password="123456"> <readHost host="hostS1" url="192.168.85.112:3306" user="root" password="123456"></readHost> </writeHost> <writeHost host="hostM2" url="192.168.85.113:3306" user="root" password="123456"> <readHost host="hostS2" url="192.168.85.114:3306" user="root" password="123456"></readHost> </writeHost> </dataHost> </mycat:schema> 下面开始进行读写分离的验证--插入以下语句,使数据不一致 insert into mytbl values(2,@@hostname); --通过查询mycat表中的数据,发现查询到的结果在node02,node03,node04之间切换,符合正常情况 select * from mytbl; --停止node01的mysql服务 service mysqld stop --重新插入语句 insert into mytbl values(3,@@hostname); --开启node01的mysql服务 service mysqld start --执行相同的查询语句,此时发现在noede01,node02,node04之间切换,符合情况 通过上述的验证,我们可以得到一个结论,node01,node03互做备机,负责写的宕机切换,其他机器充作读请求的响应。 做到此处,希望大家能够思考一个问题,在上述我们做的读写分离操作,其实都是基于主从复制的,也就是数据同步,但是在生产环境中会存在很多种情况造成主从复制延迟问题,那么我们应该如何解决延迟问题,这是一个值得思考的问题,到底如何解决呢?三、数据切分 数据切分指的是通过某种特定的条件,将我们存放在同一个数据库中的数据分散存放到多个数据库上面,以达到分散单台设备负载的效果。 数据的切分根据其切分规则的类型,可以分为两种切分模式。一种是按照不同的表来切分到不同的数据库之上,这种切可以称之为数据的垂直切分或者纵向切分,另外一种则是根据表中的数据的逻辑关系,将同一个表中的数据按照某种条件拆分到多台数据库上面,这种切分称之为数据的水平切分或者横向切分。 垂直切分的最大特点就是 规则简单,实施也更为方便,尤其适合各业务之间的耦合度非常低,相互影响很小,业务逻辑非常清晰的系统。在这种系统中,可以很容易做到将不同业务模块所使用的表分拆到不同的数据库中。根据不同的表来进行拆分,对应用程序的影响也很小,拆分规则也会比较简单清晰。 水平切分与垂直切分相比,相对来说稍微复杂一些。因为要将同一个表中的不同数据拆分到不同的数据库中,对于应用程序来说,拆分规则本身就较根据表明来拆分更为复杂,后期的数据维护也会更为复杂一些。1、垂直切分 一个数据库由很多表的构成,每个表对应着不同的业务,垂直切分是指按照业务将表进行分类,分布到不同的数据库上面,这样也就将数据或者压力分担到不同的库上面。 如上图所示,一个系统被切分成了用户系统、订单交易、支付系统等多个库。 一个架构设计较好的应用系统,其总体功能肯定是又多个功能模块所组成的。而每一个功能模块所需要的数据对应到数据库中就是一个或者多个表。而在架构设计中,各个功能模块相关质检的交互点越统一越少,系统的耦合度就越低,系统各个模块的维护性以及扩展性也就越好。这样的系统,实现数据的垂直切分也就越容易。 但是往往系统中有些表难以做到完全的独立,存在着跨库join的情况,对于这类的表,就需要去做平衡,是数据让步业务,共用一个数据源还是分成多个库,业务之间通过接口来做调用。在系统初期,数据量比较少,或者资源有限的情况下,会选择共用数据源,但是当数据发展到一定规模,负载很大的情况下就必须要做分割。 一般来讲业务存在着复杂join的场景是难以切分的,往往业务独立的易于切分。如何切分,切分到何种程度是考验技术架构的一个难题。下面来分析下垂直切分的优缺点: 优点: 1、拆分后业务清晰,拆分规则明确 2、系统之间整合或扩展容易 3、数据维护简单 缺点: 1、部分业务表无法实现join,只能通过接口方式解决,提高了系统复杂度 2、受每种业务不同的限制存在单库性能瓶颈,不易数据扩展跟性能提高 3、事务处理复杂2、水平切分 相对于垂直拆分,水平拆分不是将表做分类,而是按照某个字段的某种规则来分散到多个库中,每个表中包含一部分数据。简单来说,我们可以将数据的水平切分理解为是按照数据行切分,就是将表中的某些行切分到一个数据库,而另外的某些行又切分到其他的数据库中, 拆分数据就需要定义分片规则。关系型数据库是行列的二维模型,拆分的第一原则是找到拆分维度。比如从会员的角度来分析,商户订单交易类系统中查询会员某天某月某个订单,那么就需要按照会员结合日期来拆分,不同的数据按照会员id做分组,这样所有的数据查询join都会在单库内解决;如果从商户的角度来讲,要查询某个商家某天所有的订单数,就需要按照商户id做拆分;但是如果系统既想按照会员拆分,又想按照商家数据拆分,就会有一定的困难,需要综合考虑找到合适的分片。 几种典型的分片规则包括: 1、按照用户id取模,将数据分散到不同的数据库,具有相同数据用户的数据都被分散到一个库中; 2、按照日期,将不同月甚至日的数据分散到不同的库中; 3、按照某个特定的字段求模,或者根据特定范围段分散到不同的库中。 如图,切分原则都是根据业务找到适合的切分规则分散到不同的库,下图是用用户id求模的案例: 数据做完了水平拆分之后也是有优缺点的。 优点: 1、拆分规则抽象好,join操作基本可以数据库做; 2、不存在单库大数据,高并发的性能瓶颈; 3、应用端改造较少; 4、提高了系统的稳定性跟负载能力 缺点: 1、拆分规则难以抽象 2、分片事务一致性难以解决 3、数据多次扩展跟维护量极大 4、跨库join性能较差3、总结 数据切分的两种方式,会发现每种方式都有自己的缺点,但是他们之间有共同的缺点,分别是: 1、引入了分布式事务的问题 2、跨节点join的问题 3、跨节点合并排序分页的问题 4、多数据源管理的问题 针对数据源管理,目前主要有两种思路: 1、客户端模式,在每个应用程序模块中配置管理自己需要的一个或多个数据源,直接访问各个数据库,在模块内完成数据的整合 2、通过中间代理层来统一管理所有的数据源,后端数据库集群对前端应用程序透明; 在实际的生产环境中,我们都会选择第二种方案来解决问题,尤其是系统不断变得庞大复杂的时候,其实这是非常正确的,虽然短期内付出的成本可能会比较大,但是对整个系统的扩展性来说,是非常有帮助的。 MyCat通过数据切分解决传统数据库的缺陷,又有了nosql易于扩展的优点。通过中间代理层规避了多数据源的数据问题,对应用完全透明,同时对数据切分后存在的问题,也做了解决方案。 MyCat在做数据切分的时候应该尽可能的遵循以下原则,当然这也是经验之谈,最终的落地实现还是要看具体的应用场景在做具体的分析 第一原则:能不切分尽量不要切分 第二原则:如果要切分一定要选择合适的切分规则,提前规划好 第三原则:数据切分尽量通过数据冗余或表分组来降低跨库join的可能 第四原则:由于数据库中间件对数据join实现的优劣难以把握,而且实现高性能难度极大,业务读取尽量少使用多表join。四、MyCat的配置文件讲解 在之前的操作中,我们已经做了部分文件的配置,但是具体的属性并没有讲解,下面我们讲解下每一个配置文件具体的属性以及相关的基本配置。1、搞定schema.xml文件 schema.xml作为MyCat中重要地配置文件之一,管理着MyCat的逻辑库、表、分片规则、DataNode以及DataSource。1、schema标签<schema name="TESTDB" checkSQLschema="false" sqlMaxLimit="100" dataNode="dn1"></schema> schema标签用于定义MyCat实例中的逻辑库,MyCat可以有多个逻辑库,每个逻辑库都有自己相关的配置,可以使用schema标签来划分这些不同的逻辑库。如果不配置schame,那么所有的表配置都将属于同一个默认的逻辑库。 dataNode:该属性用于绑定逻辑库到某个具体的database上。 checkSQLschema:当该值为true时,如果执行select from TESTDB.user,那么MyCat会将语句修改为select from user,即把表示schema的字符去掉,避免发送到后端数据库执行时报错。 sqlMaxLimit:当该值设置为某个数值的时候,每次执行的sql语句,如果没有加上limit语句,MyCat也会自动加上对应的值。例如,当设置值为100的时候,那么select from user的效果跟执行select from user limit 100相同。如果不设置该值的话,MyCat默认会把所有的数据信息都查询出来,造成过多的输出,所以,还是建议设置一个具体的值,以减少过多的数据返回。当然sql语句中可以显式的制定limit的大小,不受该属性的约束。2、table标签<table name="travelrecord" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" ></table> table标签定义了MyCat中的逻辑表,所有需要拆分的表都需要在这个标签中定义。 name:定义逻辑表的表名,这个名字就如同创建表的时候指定的表名一样,同个schema标签中定义的名字必须唯一。 dataNode:定义这个逻辑表所属的dataNode,该属性的值需要和dataNode标签中的name属性值对应,如果需要定义的dn过多,可以使用如下方法减少配置:<table name="travelrecord" dataNode="multipleDn$0-99,multipleDn2$100-199" rule="auto-shardinglong" ></table> <dataNode name="multipleDn$0-99" dataHost="localhost1" database="db$0-99" ></dataNode> <dataNode name="multipleDn2$100-199" dataHost="localhost1" database=" db$100-199" ></dataNode> rule:该属性用于指定逻辑表要使用的规则名字,规则名字在rule.xml中定义,必须与tableRule标签中的name属性值一一对应 ruleRequired:该属性用于指定表是否绑定分片规则,如果配置为true,但没有配置具体rule的话,程序会报错。 primaryKey:该逻辑表对应真实表的主键,例如:分片的规则是使用非主键进行分片的,那么在使用主键查询的时候,就会发送查询语句到所有配置的DN上,如果使用改属性配置真实表的主键。那么MyCat会缓存主键与具体DN的信息,那么再次使用非主键进行查询的时候就不会进行广播式的查询,就会直接发送语句到具体的DN,但是尽管配置改属性,如果缓存没有命中的话,还是会发送语句给具体的DN来获得数据 type:该属性定义了逻辑表的类型,目前逻辑表只有全局表和普通表两种类型。对应的配置: 全局表:global 普通表:不指定该值为global的所有表 autoIncrement:MySQL 对非自增长主键,使用 last_insert_id()是不会返回结果的,只会返回 0。所以,只有定义了自增长主键的表才可以用 last_insert_id()返回主键值。MyCat目前提供了自增长主键功能,但是如果对应的MySQL节点上数据表,没有定义 auto_increment,那么在MyCat层调用 last_insert_id()也是不会返回结果的。由于 insert 操作的时候没有带入分片键, MyCat 会先取下这个表对应的全局序列,然后赋值给分片键。 这样才能正常的插入到数据库中,最后使用 last_insert_id()才会返回插入的分片键值。如果要使用这个功能最好配合使用数据库模式的全局序列。使用 autoIncrement=“true” 指定这个表有使用自增长主键,这样 MyCat才会不抛出分片键找不到的异常。使用 autoIncrement=“false” 来禁用这个功能,当然你也可以直接删除掉这个属性。默认就是禁用的。 needAddLimit:指定表是否需要自动的在每个语句后面加上limit限制。由于使用了分库分表,数据量有时会特别巨大。这时候执行查询语句,如果恰巧又忘记了加上数量限制的话,那么查询所有的数据出来,就会执行很久的时间,所以mycat自动为我们加上了limit 100。当前如果语句中又limit,那么就不会添加了。3、childTable标签 childTable标签用于定义ER分片的子表。通过标签上的属性与父表进行关联。 name:定义子表的表名 joinKey:插入子表的时候会使用这个列的值查找父表存储的数据节点 parentKey:属性指定的值一般为与父表建立关联关系的列名。程序首先获取joinkey的值,再通过parentKey属性指定的列名产生查询语句,通过执行该语句得到父表存储再哪个分片上,从而确定子表存储的位置。 primaryKey:跟table标签所描述相同 needAddLimit:跟table标签所描述相同4、dataNode标签<dataNode name="dn1" dataHost="lch3307" database="db1" ></dataNode> dataNode标签定义了mycat中的数据节点,也就是我们通常说的数据分片,一个dataNode标签就是一个独立的数据分片。 name:定义数据节点的名字,这个名字需要是唯一的,我们需要再table标签上应用这个名字,来建立表与分片对应的关系 dataHost:该属性用于定义该分片属于哪个数据库实例,属性值是引用dataHost标签上定义的name属性。 database:该属性用于定义该分片属性哪个具体数据库实力上的具体库,5、dataHost标签 该标签定义了具体的数据库实例、读写分离配置和心跳语句<dataHost name="localhost1" maxCon="1000" minCon="10" balance="0" writeType="0" dbType="mysql" dbDriver="native"> <heartbeat>select user()</heartbeat> <!-- can have multi write hosts --> <writeHost host="hostM1" url="localhost:3306" user="root" password="123456"> <!-- can have multi read hosts --> <!-- <readHost host="hostS1" url="localhost:3306" user="root" password="123456" /> --> </writeHost> <!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> --> </dataHost> name:唯一标识dataHost标签,供上层的标签使用 maxcon:指定每个读写实例连接池的最大连接 mincon:指定每个读写实例连接连接池的最小链接,初始化连接池的大小 balance:负载均衡类型: 0:不开启读写分离机制,所有读操作都发送到当前可用的writeHost上 1:全部的readHost和stand by writeHost参与select语句的负载均衡,简单的说,当双主双从模式(M1->S1,M2->S2,并且M1与M2互为主备),正常情况下,M2,S1,S2都参与select语句的负载均衡 2:所有读操作都随机的再writeHost、readHost上分发 3:所有读请求随机的分发到writeHost对应readHost执行,writeHost不负担读压力,在之后的版本中失效。 writeType:写类型 writeType=0:所有的写操作发送到配置的第一个writeHost,第一个挂了切换到还生存的第二个writeHost,重启之后以切换后的为准,切换记录保存在配置文件 dnindex.properties writeType=1:所有写操作都随机的发送到配置的writeHost,1.5以后废弃不推荐 dbType:指定后端连接的数据库类型,如MySQL,mongodb,oracle dbDriver:指定连接后端数据库使用的Driver,目前可选的值有native和JDBC。使用native的话,因为这个值执行的是二进制的mysql协议,所以可以使用mysql和maridb,其他类型的数据库则需要使用JDBC驱动来支持。 switchType:是否进行主从切换 -1:表示不自动切换 1:默认值,自动切换 2:基于mysql主从同步的状态决定是否切换6、heartbeat标签 这个标签指明用于和后端数据库进行心跳检测的语句。2、搞定server.xml文件 server.xml几乎保存了所有mycat需要的系统配置信息。<user name="test"> <property name="password">test</property> <property name="schemas">TESTDB</property> <property name="schemas">TESTDB</property> <property name="schemas">TESTDB</property> <property name="schemas">TESTDB</property> <privileges check="false"> <schema name="TESTDB" dml="0010" showTables="custome/mysql"> <table name="tbl_user" dml="0110"></table> <table name="tbl_dynamic" dml="1111"></table> </schema> </privileges> </user> server.xml中的标签本就不多,这个标签主要用于定义登录mycat的用户和权限。 property标签用来声明具体的属性值,name用来指定用户名,password用来修改密码,readonly用来限制用户是否是只读的,schemas用来控制用户课访问的schema,如果有多个的话,用逗号分隔 privileges标签是对用户的schema及下级的table进行精细化的DML控制,privileges节点的check属性适用于标识是否开启DML权限检查,默认false标识不检查,当然不配置等同于不检查 在进行检查的时候,是通过四个二进制位来标识的,insert,update,select,delete按照顺序标识,0表示未检查,1表示要检查 system标签表示系统的相关属性:属性含义备注charset字符集设置utf8,utf8mb4defaultSqlParser指定的默认解析器druidparser,fdbparser(1.4之后作废)processors系统可用的线程数,默认为机器CPU核心线程数processorBufferChunk每次分配socket direct buffer的大小默认是4096个字节processorExecutor指定NIOProcessor共享的businessExecutor固定线程池大小,mycat在处理异步逻辑的时候会把任务提交到这个线程池中 sequnceHandlerTypemycat全局序列的类型0为本地文件,1为数据库方式,2为时间戳方式,3为分布式ZK ID生成器,4为zk递增id生成3、rule.xml rule.xml里面就定义了我们对表进行拆分所涉及到的规则定义。我们可以灵活的对表使用不同的分片算法,或者对表使用相同的算法但具体的参数不同,这个文件里面主要有tableRule和function这两个标签。1、tableRule 这个标签被用来定义表规则,定义的表规则在schema.xml文件中<tableRule name="rule1"> <rule> <columns>id</columns> <algorithm>func1</algorithm> </rule> </tableRule> name属性指定唯一的名字,用来标识不同的表规则 内嵌的rule标签则指定对物理表中的哪一列进行拆分和使用什么路由算法 columns内指定要拆分的列的名字 algorithm使用function标签中的那么属性,连接表规则和具体路由算法。当然,多个表规则可以连接到同一个路由算法上。2、function<function name="hash-int" class="io.mycat.route.function.PartitionByFileMap"> <property name="mapFile">partition-hash-int.txt</property> </function> name指定算法的名字 class指定路由算法具体的类名字 property为具体算法需要用到的一些属性
2022年04月14日
118 阅读
0 评论
4 点赞
2022-04-14
RestTemplate使用
RestTemplate使用一、依赖注入 @Bean // 开启负载均衡 @LoadBalanced RestTemplate restTemplate() { return new RestTemplate(); }二、调用服务String url ="http://provider/getHi"; String respStr = restTemplate.getForObject(url, String.class);三、get 请求1、getForEntitygetForEntity方法的返回值是一个ResponseEntity,ResponseEntity是Spring对HTTP请求响应的封装,包括了几个重要的元素,如响应码、contentType、contentLength、响应消息体等。<200,Hi,[Content-Type:"text/plain;charset=UTF-8", Content-Length:"8", Date:"Fri, 10 Apr 2020 09:58:44 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]>2、返回Map调用方 String url ="http://provider/getMap"; ResponseEntity<Map> entity = restTemplate.getForEntity(url, Map.class); System.out.println("respStr: " + entity.getBody() );生产方 @GetMapping("/getMap") public Map<String, String> getMap() { HashMap<String, String> map = new HashMap<>(); map.put("name", "500"); return map; }3、返回对象(getForEntity)调用方 ResponseEntity<Person> entity = restTemplate.getForEntity(url, Person.class); System.out.println("respStr: " + ToStringBuilder.reflectionToString(entity.getBody() ));生产方 @GetMapping("/getObj") public Person getObj() { Person person = new Person(); person.setId(100); person.setName("xiaoming"); return person; }Person类 private int id; private String name;4、传参调用4.1、使用占位符 String url ="http://provider/getObjParam?name={1}"; ResponseEntity<Person> entity = restTemplate.getForEntity(url, Person.class,"hehehe...");4.2、使用map String url ="http://provider/getObjParam?name={name}"; Map<String, String> map = Collections.singletonMap("name", " memeda"); ResponseEntity<Person> entity = restTemplate.getForEntity(url, Person.class,map);5、返回对象(getForObject)Person person = restTemplate.getForObject(url, Person.class,map);四、post 请求1、postForEntity调用方 String url ="http://provider/postParam"; Map<String, String> map = Collections.singletonMap("name", " memeda"); ResponseEntity<Person> entity = restTemplate.postForEntity(url, map, Person.class);生产方 @PostMapping("/postParam") public Person postParam(@RequestBody String name) { System.out.println("name:" + name); Person person = new Person(); person.setId(100); person.setName("xiaoming" + name); return person; }2、postForLocation调用方 String url ="http://provider/postParam"; Map<String, String> map = Collections.singletonMap("name", " memeda"); URI location = restTemplate.postForLocation(url, map, Person.class); System.out.println(location);生产方需要设置头信息,不然返回的是null public URI postParam(@RequestBody Person person,HttpServletResponse response) throws Exception { URI uri = new URI("https://www.baidu.com/s?wd="+person.getName()); response.addHeader("Location", uri.toString());五、exchange可以自定义http请求的头信息,同时保护get和post方法第一步:自定义拦截器需要实现ClientHttpRequestInterceptor接口public class LoggingClientHttpRequestInterceptor implements ClientHttpRequestInterceptor { @Override public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { System.out.println("拦截啦!!!"); System.out.println(request.getURI()); ClientHttpResponse response = execution.execute(request, body); System.out.println(response.getHeaders()); return response; }第二步:添加到RestTemplate中 @Bean @LoadBalanced RestTemplate restTemplate() { RestTemplate restTemplate = new RestTemplate(); restTemplate.getInterceptors().add(new LoggingClientHttpRequestInterceptor()); return restTemplate; }
2022年04月14日
74 阅读
0 评论
5 点赞
2022-04-01
本地事务与分布式事务
本地事务一、事务的基本性质数据库事务的几个特性:原子性(Atomicity )、一致性( Consistency )、隔离性或独立性( Isolation)和持久性(Durabilily),简称就是ACID;原子性:一系列的操作整体不可拆分,要么同时成功,要么同时失败一致性:数据在事务的前后,业务整体一致。(A向B转账,转账前后AB总金额一致)隔离性:事务之间互相隔离。持久性:一旦事务成功,数据一定会落盘在数据库。单体应用中,我们多个业务操作使用同一条连接操作不同的数据表,一旦有异常,我们可以很容易的整体回滚;Business:我们具体的业务代码Storage:库存业务代码;扣库存Order:订单业务代码;保存订单Account:账号业务代码;减账户余额比如买东西业务,扣库存,下订单,账户扣款,是一个整体;必须同时成功或者失败一个事务开始,代表以下的所有操作都在同一个连接里面;二、事务的隔离级别READ UNCOMMITTED(读未提交)该隔离级别的事务会读到其它未提交事务的数据,此现象也称之为脏读。READ COMMITTED(读提交)一个事务可以读取另一个已提交的事务,多次读取会造成不一样的结果,此现象称为不可重 复读问题,Oracle 和SQL Server 的默认隔离级别。REPEATABLE READ(可重复读)该隔离级别是MySQL 默认的隔离级别,在同一个事务里,select 的结果是事务开始时时间 点的状态,因此,同样的select 操作读到的结果会是一致的,但是,会有幻读现象。MySQL 的InnoDB 引擎可以通过next-key locks 机制(行锁的算法)来避免幻读。SERIALIZABLE(序列化)在该隔离级别下事务都是串行顺序执行的,MySQL 数据库的InnoDB 引擎会给读操作隐式 加一把读共享锁,从而避免了脏读、不可重读复读和幻读问题。三、事务的传播行为PROPAGATION_REQUIRED如果当前没有事务,就创建一个新事务,如果当前存在事务,就加入该事务,该设置是最常用的设置。PROPAGATION_SUPPORTS支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就以非事务执行。PROPAGATION_MANDATORY支持当前事务,如果当前存在事务,就加入该事务,如果当前不存在事务,就抛出异常。PROPAGATION_REQUIRES_NEW创建新事务,无论当前存不存在事务,都创建新事务。PROPAGATION_NOT_SUPPORTED以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。PROPAGATION_NEVER以非事务方式执行,如果当前存在事务,则抛出异常。PROPAGATION_NESTED如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED 类似的操作。四、SpringBoot 事务关键点事务的自动配置TransactionAutoConfiguration事务的坑在同一个类里面,编写两个方法,内部调用的时候,会导致事务设置失效。原因是没有用到 代理对象的缘故。解决0)、导入spring-boot-starter-aop 1)、@EnableTransactionManagement(proxyTargetClass = true) 2)、@EnableAspectJAutoProxy(exposeProxy=true) 3)、AopContext.currentProxy() 调用方法分布式事务一、为什么有分布式事务分布式系统经常出现的异常机器宕机、网络异常、消息丢失、消息乱序、数据错误、不可靠的TCP、存储数据丢失...二、CAP 定理与BASE 理论1、CAP 定理CAP 原则又称CAP 定理,指的是在一个分布式系统中一致性(Consistency)在分布式系统中的所有数据备份,在同一时刻是否同样的值。(等同于所有节点访问同一份最新的数据副本)可用性(Availability)在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求。(对数据更新具备高可用性)分区容错性(Partition tolerance)大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。一般来说,分区容错无法避免,因此可以认为CAP 的P 总是成立。CAP 定理告诉我们,剩下的C 和A 无法同时做到。分布式系统中实现一致性的raft 算法、paxos http://thesecretlivesofdata.com/raft/面临的问题对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所以节点故障、网络故障是常态,而且要保证服务可用性达到99.99999%(N 个9),即保证P 和A,舍弃C。2、BASE 理论是对CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性。什么是BASE :基本可用(Basically Available)基本可用是指分布式系统在出现故障的时候,允许损失部分可用性(例如响应时间、功能上的可用性),允许损失部分可用性。需要注意的是,基本可用绝不等价于系统不可用。响应时间上的损失:正常情况下搜索引擎需要在0.5 秒之内返回给用户相应的查询结果,但由于出现故障(比如系统部分机房发生断电或断网故障),查询结果的响应时间增加到了1~2 秒。功能上的损失:购物网站在购物高峰(如双十一)时,为了保护系统的稳定性,部分消费者可能会被引导到一个降级页面。软状态( Soft State)软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据会有多个副本,允许不同副本同步的延时就是软状态的体现。mysql replication 的异步复制也是一种体现。最终一致性( Eventual Consistency)最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。强一致性、弱一致性、最终一致性从客户端角度,多进程并发访问时,更新过的数据在不同进程如何获取的不同策略,决定了不同的一致性。对于关系型数据库,要求更新过的数据能被后续的访问都能看到,这是强一致性。如果能容忍后续的部分或者全部访问不到,则是弱一致性。如果经过一段时间后要求能访问到更新后的数据,则是最终一致性。3、分布式事务几种方案2PC 模式数据库支持的2PC【2 phase commit 二阶提交】,又叫做XA Transactions。MySQL 从5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交。第二阶段:事务协调器要求每个数据库提交数据。其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。XA 协议比较简单,而且一旦商业数据库实现了XA 协议,使用分布式事务的成本也比较低。XA 性能不理想,特别是在交易下单链路,往往并发量很高,XA 无法满足高并发场景XA 目前在商业数据库支持的比较理想,在mysql 数据库中支持的不太理想,mysql 的XA 实现,没有记录prepare 阶段日志,主备切换回导致主库与备库数据不一致。许多nosql 也没有支持XA,这让XA 的应用场景变得非常狭隘。也有3PC,引入了超时机制(无论协调者还是参与者,在向对方发送请求后,若长时间未收到回应则做出相应处理)。柔性事务-TCC 事务补偿型方案刚性事务:遵循ACID 原则,强一致性。柔性事务:遵循BASE 理论,最终一致性;与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。一阶段prepare 行为:调用自定义的prepare 逻辑。二阶段commit 行为:调用自定义的commit 逻辑。二阶段rollback 行为:调用自定义的rollback 逻辑。所谓TCC 模式,是指支持把自定义的分支事务纳入到全局事务的管理中。柔性事务-最大努力通知型方案按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合MQ 进行实现,例如:通过MQ 发送http 请求,设置最大通知次数。达到通知次数后即不再通知。案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调。柔性事务-可靠消息+最终一致性方案(异步确保型)实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。防止消息丢失:/** * 1、做好消息确认机制(pulisher,consumer【手动ack】) * 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再次发送一 遍 */CREATE TABLE `mq_message` ( `message_id` char(32) NOT NULL, `content` text, `to_exchane` varchar(255) DEFAULT NULL, `routing_key` varchar(255) DEFAULT NULL, `class_type` varchar(255) DEFAULT NULL, `message_status` int(1) DEFAULT '0' COMMENT '0-新建1-已发送2-错误抵达3-已抵达', `create_time` datetime DEFAULT NULL, `update_time` datetime DEFAULT NULL, PRIMARY KEY (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
2022年04月01日
113 阅读
0 评论
4 点赞
2022-03-11
接口幂等性
接口幂等性1、什么是幂等性接口幂等性就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用;比如说支付场景,用户购买了商品支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额返发现多扣钱了,流水记录也变成了两条...,这就没有保证接口的幂等性。2、哪些情况需要防止用户多次点击按钮用户页面回退再次提交微服务互相调用,由于网络问题,导致请求失败。feign触发重试机制其他业务情况3、什么情况下需要幂等天然幂等的:以SQL为例,有些操作是。SELECT*FROM table WHERid=?无论执行多少次都不会改变状态,是天然的幂等。UPDATE tab1 SET col1=1WHERE col2=2 无论执行成功多少次状态都是一致的,也是幂等操作。delete from user where userid=1多次操作,结果一样,具备幂等性insert into user(userid,name)values(1,a)如userid为唯一主键,即重复操作上面的业务,只会插入一条用户数据,具备幂等性。不是幂等的UPDATE tab1 SET col1=col1+1WHERE col2=2每次执行的结果都会发生变化,不是幂等的。insert into user(userid,name)values(1,a)如userid不是主键,可以重复,那上面业务多次操作,数据都会新增多条,不具备幂等性。4、幂等解决方案一、token机制1、服务端提供了发送token的接口。我们在分析业务的时候,哪些业务是存在幂等问题的,就必须在执行业务前,先去获取token,服务器会把token保存到redis中。2、然后调用业务接口请求时,把token携带过去,一般放在请求头部。3、服务器判断token是否存在redis中,存在表示第一次请求,然后删除token,继续执行业务。4、如果判断token不存在redis中,就表示是重复操作,直接返回重复标记给client,这样就保证了业务代码,不被重复执行。危险性:1、先删除token 还是后删除token;(1)先删除可能导致,业务确实没有执行,重试还带上之前token,由于防重设计导致,请求还是不能执行。(2)后删除可能导致,业务处理成功,但是服务闪断,出现超时,没有删除 token,别人继续重试,导致业务被执行两边(3)我们最好设计为先删除token,如果业务调用失败,就重新获取token 再次请求。2、Token获取、比较和删除必须是原子性(1)redis.get(token)、token.equals、redis.delltoken)如果这两个操作不是原子,可能导致,高并发下,都get 到同样的数据,判断都成功,继续业务并发执行(2)可以在redis 使用lua脚本完成这个操作if redis. call("get', KEYS[1])==ARGV[1] then return redis. call(' del", KEYS[1]) else return 0 end二、各种锁机制1、数据库悲观锁select*from xxxx where id=1 for update;悲观锁使用时一般伴随事务一起使用,数据锁定时间可能会很长,需要根据实际情况选用。另外要注意的是,id字段一定是主键或者唯一索引,不然可能造成锁表的结果,处理起来会非常麻烦。2、数据库乐观锁这种方法适合在更新的场景中update t_goods set count=count-1,version =version+1 where good_id=2 and version=1根据 version版本,也就是在操作库存前先获取当前商品的 version 版本号,然后操作的时候带上此version号。我们梳理下,我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。乐观锁主要使用于处理读多写少的问题。3、业务层分怖式锁如果多个机器可能在同一时间同时处理相同的数据,比如多台机器定时任务都拿到了相同数据处理,我们就可以加分布式锁,锁定此数据,处理完成后释放锁。获取到锁的必须先判断这个数据是否被处理过。3、各种唯一约束1、数据库唯一约束插入数据,应该按照唯一索引进行插入,比如订单号,相同的订单就不可能有两条记录插入。我们在数据库层面防止重复。这个机制是利用了数据库的主键唯一约束的特性,解决了在insert场景时幂等问题。但主键的要求不是自增的主键,这样就需要业务生成全局唯一的主键。如果是分库分表场景下,路由规则要保证相同请求下,落地在同一个数据库和同一表中,要不然数据库主键约束就不起效果了,因为是不同的数据库和表主键不相关。2、redis set防重很多数据需要处理,只能被处理一次,比如我们可以计算数据的MD5将其放入redis的set,每次处理数据,先看这个MD5是否已经存在,存在就不处理。4、防重表使用订单号orderNo做为去重表的唯一索引,把唯一索引插入去重表,再进行业务操作,且他们在同一个事务中。这个保证了重复请求时,因为去重表有唯一约束,导致请求失败,避免了幂等问题。这里要注意的是,去重表和业务表应该在同一库中,这样就保证了在同一个事务,即使业务操作失败了,也会把去重表的数据回滚。这个很好的保证了数据一致性。redis防重也算。5、全局请求唯一id调用接口时,生成一个唯一id,redis 将数据保存到集合中(去重),存在即处理过。可以使用nginx设置每一个请求的唯一id;proxy_set_header X-Request-ld Srequest_id;
2022年03月11日
86 阅读
0 评论
10 点赞
2022-03-10
Feign远程调用丢失请求头问题
openFeign远程调用丢失请求头feign在远程调用之前要构造请求,调用会经过很多拦截器。丢失请求头原因,是远程调用重新构建了request请求,新的request没有请求头headr。解决方案:定义一个feign的拦截器先来看一下feign远程调用类SynchronousMethodHandler// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package feign; import feign.InvocationHandlerFactory.MethodHandler; import feign.Logger.Level; import feign.Request.Options; import feign.codec.Decoder; import feign.codec.ErrorDecoder; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; final class SynchronousMethodHandler implements MethodHandler { private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L; private final MethodMetadata metadata; private final Target<?> target; private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Level logLevel; private final feign.RequestTemplate.Factory buildTemplateFromArgs; private final Options options; private final ExceptionPropagationPolicy propagationPolicy; private final Decoder decoder; private final AsyncResponseHandler asyncResponseHandler; private SynchronousMethodHandler(Target<?> target, Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Level logLevel, MethodMetadata metadata, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy, boolean forceDecoding) { this.target = (Target)Util.checkNotNull(target, "target", new Object[0]); this.client = (Client)Util.checkNotNull(client, "client for %s", new Object[]{target}); this.retryer = (Retryer)Util.checkNotNull(retryer, "retryer for %s", new Object[]{target}); this.requestInterceptors = (List)Util.checkNotNull(requestInterceptors, "requestInterceptors for %s", new Object[]{target}); this.logger = (Logger)Util.checkNotNull(logger, "logger for %s", new Object[]{target}); this.logLevel = (Level)Util.checkNotNull(logLevel, "logLevel for %s", new Object[]{target}); this.metadata = (MethodMetadata)Util.checkNotNull(metadata, "metadata for %s", new Object[]{target}); this.buildTemplateFromArgs = (feign.RequestTemplate.Factory)Util.checkNotNull(buildTemplateFromArgs, "metadata for %s", new Object[]{target}); this.options = (Options)Util.checkNotNull(options, "options for %s", new Object[]{target}); this.propagationPolicy = propagationPolicy; if (forceDecoding) { this.decoder = decoder; this.asyncResponseHandler = null; } else { this.decoder = null; this.asyncResponseHandler = new AsyncResponseHandler(logLevel, logger, decoder, errorDecoder, decode404, closeAfterDecode); } } public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = this.buildTemplateFromArgs.create(argv); Options options = this.findOptions(argv); Retryer retryer = this.retryer.clone(); while(true) { try { return this.executeAndDecode(template, options); } catch (RetryableException var9) { RetryableException e = var9; try { retryer.continueOrPropagate(e); } catch (RetryableException var8) { Throwable cause = var8.getCause(); if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) { throw cause; } throw var8; } if (this.logLevel != Level.NONE) { this.logger.logRetry(this.metadata.configKey(), this.logLevel); } } } } Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { Request request = this.targetRequest(template); if (this.logLevel != Level.NONE) { this.logger.logRequest(this.metadata.configKey(), this.logLevel, request); } long start = System.nanoTime(); Response response; try { response = this.client.execute(request, options); response = response.toBuilder().request(request).requestTemplate(template).build(); } catch (IOException var12) { if (this.logLevel != Level.NONE) { this.logger.logIOException(this.metadata.configKey(), this.logLevel, var12, this.elapsedTime(start)); } throw FeignException.errorExecuting(request, var12); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); if (this.decoder != null) { return this.decoder.decode(response, this.metadata.returnType()); } else { CompletableFuture<Object> resultFuture = new CompletableFuture(); this.asyncResponseHandler.handleResponse(resultFuture, this.metadata.configKey(), response, this.metadata.returnType(), elapsedTime); try { if (!resultFuture.isDone()) { throw new IllegalStateException("Response handling not done"); } else { return resultFuture.join(); } } catch (CompletionException var13) { Throwable cause = var13.getCause(); if (cause != null) { throw cause; } else { throw var13; } } } } long elapsedTime(long start) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } Request targetRequest(RequestTemplate template) { Iterator var2 = this.requestInterceptors.iterator(); while(var2.hasNext()) { RequestInterceptor interceptor = (RequestInterceptor)var2.next(); interceptor.apply(template); } return this.target.apply(template); } Options findOptions(Object[] argv) { if (argv != null && argv.length != 0) { Stream var10000 = Stream.of(argv); Options.class.getClass(); var10000 = var10000.filter(Options.class::isInstance); Options.class.getClass(); return (Options)var10000.map(Options.class::cast).findFirst().orElse(this.options); } else { return this.options; } } static class Factory { private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Level logLevel; private final boolean decode404; private final boolean closeAfterDecode; private final ExceptionPropagationPolicy propagationPolicy; private final boolean forceDecoding; Factory(Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Level logLevel, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy, boolean forceDecoding) { this.client = (Client)Util.checkNotNull(client, "client", new Object[0]); this.retryer = (Retryer)Util.checkNotNull(retryer, "retryer", new Object[0]); this.requestInterceptors = (List)Util.checkNotNull(requestInterceptors, "requestInterceptors", new Object[0]); this.logger = (Logger)Util.checkNotNull(logger, "logger", new Object[0]); this.logLevel = (Level)Util.checkNotNull(logLevel, "logLevel", new Object[0]); this.decode404 = decode404; this.closeAfterDecode = closeAfterDecode; this.propagationPolicy = propagationPolicy; this.forceDecoding = forceDecoding; } public MethodHandler create(Target<?> target, MethodMetadata md, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder) { return new SynchronousMethodHandler(target, this.client, this.retryer, this.requestInterceptors, this.logger, this.logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder, this.decode404, this.closeAfterDecode, this.propagationPolicy, this.forceDecoding); } } } 远程调用时,构建了RequestInterceptor。自定义feign拦截器import feign.RequestInterceptor; import feign.RequestTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.web.context.request.ServletRequestAttributes; import javax.servlet.http.HttpServletRequest; /** * @description: TODO * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 23:03 * @version: 1.0 */ @Configuration public class FamilyFegin { @Bean(name ="requestInterceptor") public RequestInterceptor requestInterceptor(){ return new RequestInterceptor() { @Override public void apply(RequestTemplate requestTemplate) { //RequestContextHolder拿到刚请求来的数据 ServletRequestAttributes requestAttributes = (ServletRequestAttributes)RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); System.out.println("调用feign之前"); //同步请求头数据 String cookie = request.getHeader("Cookie");//老数据 //给新请求同步老请求的cookie requestTemplate.header("Cookie", cookie); } }; } }拦截器说明RequestContextHolder.getRequestAttributes()获取到RequestAttributes类,RequestAttributes源码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.web.context.request; import org.springframework.lang.Nullable; public interface RequestAttributes { int SCOPE_REQUEST = 0; int SCOPE_SESSION = 1; String REFERENCE_REQUEST = "request"; String REFERENCE_SESSION = "session"; @Nullable Object getAttribute(String var1, int var2); void setAttribute(String var1, Object var2, int var3); void removeAttribute(String var1, int var2); String[] getAttributeNames(int var1); void registerDestructionCallback(String var1, Runnable var2, int var3); @Nullable Object resolveReference(String var1); String getSessionId(); Object getSessionMutex(); } ServletRequestAttributes继承了AbstractRequestAttributes,AbstractRequestAttributes实现了RequestAttributes,因此可以强转。 public abstract class AbstractRequestAttributes implements RequestAttributes { protected final Map<String, Runnable> requestDestructionCallbacks = new LinkedHashMap(8); private volatile boolean requestActive = true; public AbstractRequestAttributes() { } public void requestCompleted() {注意:该处理是在同步一个线程种处理的,才能获取之前的heard中的cookie信息。异步多线程该处理方式不同。
2022年03月10日
150 阅读
0 评论
5 点赞
1
2
3