首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,006 阅读
2
Docker搭建Typecho博客
644 阅读
3
Nacos持久化MySQL问题-解决方案
634 阅读
4
滑动时间窗口算法
515 阅读
5
keytool证书导入
496 阅读
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
lambda
stream
nginx
少年
累计撰写
184
篇文章
累计收到
14
条评论
首页
栏目
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
页面
关于
友链
搜索到
8
篇与
的结果
2022-03-24
SpringBoot整合SpringCache
SpringBoot整合SpringCache1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>2、引入redis依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>配置redis地址密码等等,可参考springboo整合redis3、默认配置自动配置// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.cache; import java.util.List; import java.util.stream.Collectors; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.data.couchbase.CouchbaseDataAutoConfiguration; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.boot.autoconfigure.hazelcast.HazelcastAutoConfiguration; import org.springframework.boot.autoconfigure.orm.jpa.EntityManagerFactoryDependsOnPostProcessor; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.CacheManager; import org.springframework.cache.interceptor.CacheAspectSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ImportSelector; import org.springframework.core.type.AnnotationMetadata; import org.springframework.orm.jpa.AbstractEntityManagerFactoryBean; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.util.Assert; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({CacheManager.class}) @ConditionalOnBean({CacheAspectSupport.class}) @ConditionalOnMissingBean( value = {CacheManager.class}, name = {"cacheResolver"} ) @EnableConfigurationProperties({CacheProperties.class}) @AutoConfigureAfter({CouchbaseDataAutoConfiguration.class, HazelcastAutoConfiguration.class, HibernateJpaAutoConfiguration.class, RedisAutoConfiguration.class}) @Import({CacheAutoConfiguration.CacheConfigurationImportSelector.class, CacheAutoConfiguration.CacheManagerEntityManagerFactoryDependsOnPostProcessor.class}) public class CacheAutoConfiguration { public CacheAutoConfiguration() { } @Bean @ConditionalOnMissingBean public CacheManagerCustomizers cacheManagerCustomizers(ObjectProvider<CacheManagerCustomizer<?>> customizers) { return new CacheManagerCustomizers((List)customizers.orderedStream().collect(Collectors.toList())); } @Bean public CacheAutoConfiguration.CacheManagerValidator cacheAutoConfigurationValidator(CacheProperties cacheProperties, ObjectProvider<CacheManager> cacheManager) { return new CacheAutoConfiguration.CacheManagerValidator(cacheProperties, cacheManager); } static class CacheConfigurationImportSelector implements ImportSelector { CacheConfigurationImportSelector() { } public String[] selectImports(AnnotationMetadata importingClassMetadata) { CacheType[] types = CacheType.values(); String[] imports = new String[types.length]; for(int i = 0; i < types.length; ++i) { imports[i] = CacheConfigurations.getConfigurationClass(types[i]); } return imports; } } static class CacheManagerValidator implements InitializingBean { private final CacheProperties cacheProperties; private final ObjectProvider<CacheManager> cacheManager; CacheManagerValidator(CacheProperties cacheProperties, ObjectProvider<CacheManager> cacheManager) { this.cacheProperties = cacheProperties; this.cacheManager = cacheManager; } public void afterPropertiesSet() { Assert.notNull(this.cacheManager.getIfAvailable(), () -> { return "No cache manager could be auto-configured, check your configuration (caching type is '" + this.cacheProperties.getType() + "')"; }); } } @ConditionalOnClass({LocalContainerEntityManagerFactoryBean.class}) @ConditionalOnBean({AbstractEntityManagerFactoryBean.class}) static class CacheManagerEntityManagerFactoryDependsOnPostProcessor extends EntityManagerFactoryDependsOnPostProcessor { CacheManagerEntityManagerFactoryDependsOnPostProcessor() { super(new String[]{"cacheManager"}); } } }所有相关配置都在CacheProperties。重点关注自动配置中的: static class CacheConfigurationImportSelector implements ImportSelector { CacheConfigurationImportSelector() { } public String[] selectImports(AnnotationMetadata importingClassMetadata) { CacheType[] types = CacheType.values(); String[] imports = new String[types.length]; for(int i = 0; i < types.length; ++i) { imports[i] = CacheConfigurations.getConfigurationClass(types[i]); } return imports; } }CacheConfigurations.getConfigurationClass(types[i]); static String getConfigurationClass(CacheType cacheType) { String configurationClassName = (String)MAPPINGS.get(cacheType); Assert.state(configurationClassName != null, () -> { return "Unknown cache type " + cacheType; }); return configurationClassName; }MAPPINGS.get(cacheType);static { Map<CacheType, String> mappings = new EnumMap(CacheType.class); mappings.put(CacheType.GENERIC, GenericCacheConfiguration.class.getName()); mappings.put(CacheType.EHCACHE, EhCacheCacheConfiguration.class.getName()); mappings.put(CacheType.HAZELCAST, HazelcastCacheConfiguration.class.getName()); mappings.put(CacheType.INFINISPAN, InfinispanCacheConfiguration.class.getName()); mappings.put(CacheType.JCACHE, JCacheCacheConfiguration.class.getName()); mappings.put(CacheType.COUCHBASE, CouchbaseCacheConfiguration.class.getName()); mappings.put(CacheType.REDIS, RedisCacheConfiguration.class.getName()); mappings.put(CacheType.CAFFEINE, CaffeineCacheConfiguration.class.getName()); mappings.put(CacheType.SIMPLE, SimpleCacheConfiguration.class.getName()); mappings.put(CacheType.NONE, NoOpCacheConfiguration.class.getName()); MAPPINGS = Collections.unmodifiableMap(mappings); }mappings.put(CacheType.REDIS, RedisCacheConfiguration.class.getName());CacheAutoConfiguration会导入RedisCacheConfiguration;// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.cache; import java.util.LinkedHashSet; import java.util.List; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.cache.CacheProperties.Redis; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.cache.RedisCacheManager.RedisCacheManagerBuilder; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({RedisConnectionFactory.class}) @AutoConfigureAfter({RedisAutoConfiguration.class}) @ConditionalOnBean({RedisConnectionFactory.class}) @ConditionalOnMissingBean({CacheManager.class}) @Conditional({CacheCondition.class}) class RedisCacheConfiguration { RedisCacheConfiguration() { } @Bean RedisCacheManager cacheManager(CacheProperties cacheProperties, CacheManagerCustomizers cacheManagerCustomizers, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ObjectProvider<RedisCacheManagerBuilderCustomizer> redisCacheManagerBuilderCustomizers, RedisConnectionFactory redisConnectionFactory, ResourceLoader resourceLoader) { RedisCacheManagerBuilder builder = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(this.determineConfiguration(cacheProperties, redisCacheConfiguration, resourceLoader.getClassLoader())); List<String> cacheNames = cacheProperties.getCacheNames(); if (!cacheNames.isEmpty()) { builder.initialCacheNames(new LinkedHashSet(cacheNames)); } if (cacheProperties.getRedis().isEnableStatistics()) { builder.enableStatistics(); } redisCacheManagerBuilderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return (RedisCacheManager)cacheManagerCustomizers.customize(builder.build()); } private org.springframework.data.redis.cache.RedisCacheConfiguration determineConfiguration(CacheProperties cacheProperties, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ClassLoader classLoader) { return (org.springframework.data.redis.cache.RedisCacheConfiguration)redisCacheConfiguration.getIfAvailable(() -> { return this.createConfiguration(cacheProperties, classLoader); }); } private org.springframework.data.redis.cache.RedisCacheConfiguration createConfiguration(CacheProperties cacheProperties, ClassLoader classLoader) { Redis redisProperties = cacheProperties.getRedis(); org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeValuesWith(SerializationPair.fromSerializer(new JdkSerializationRedisSerializer(classLoader))); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } } 关注里面的方法:@Bean RedisCacheManager cacheManager(CacheProperties cacheProperties, CacheManagerCustomizers cacheManagerCustomizers, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ObjectProvider<RedisCacheManagerBuilderCustomizer> redisCacheManagerBuilderCustomizers, RedisConnectionFactory redisConnectionFactory, ResourceLoader resourceLoader) { RedisCacheManagerBuilder builder = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(this.determineConfiguration(cacheProperties, redisCacheConfiguration, resourceLoader.getClassLoader())); List<String> cacheNames = cacheProperties.getCacheNames(); if (!cacheNames.isEmpty()) { builder.initialCacheNames(new LinkedHashSet(cacheNames)); } if (cacheProperties.getRedis().isEnableStatistics()) { builder.enableStatistics(); } redisCacheManagerBuilderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return (RedisCacheManager)cacheManagerCustomizers.customize(builder.build()); }相当于自动配置好了缓存管理器RedisCacheManager,cacheProperties.getCacheNames();方法会读取yml里面的配置,接着初始化缓存builder.initialCacheNames(new LinkedHashSet(cacheNames)); public RedisCacheManager.RedisCacheManagerBuilder initialCacheNames(Set<String> cacheNames) { Assert.notNull(cacheNames, "CacheNames must not be null!"); cacheNames.forEach((it) -> { this.withCacheConfiguration(it, this.defaultCacheConfiguration); }); return this; }this.withCacheConfiguration(it, this.defaultCacheConfiguration);public RedisCacheManager.RedisCacheManagerBuilder withCacheConfiguration(String cacheName, RedisCacheConfiguration cacheConfiguration) { Assert.notNull(cacheName, "CacheName must not be null!"); Assert.notNull(cacheConfiguration, "CacheConfiguration must not be null!"); this.initialCaches.put(cacheName, cacheConfiguration); return this; }RedisCacheConfiguration中默认缓存定义规则:private org.springframework.data.redis.cache.RedisCacheConfiguration createConfiguration(CacheProperties cacheProperties, ClassLoader classLoader) { Redis redisProperties = cacheProperties.getRedis(); org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeValuesWith(SerializationPair.fromSerializer(new JdkSerializationRedisSerializer(classLoader))); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; }redisProperties也就是从yml中得到的配置。4、配置需要配置的地方:配置使用redis作为缓存spring.cache.type=redis使用缓存,官方文档 https://docs.spring.io/spring-framework/docs/5.3.18-SNAPSHOT/reference/html/integration.html#cache重点关注几个注解:@Cacheable: Triggers cache population. @CacheEvict: Triggers cache eviction. @CachePut: Updates the cache without interfering with the method execution. @Caching: Regroups multiple cache operations to be applied on a method. @CacheConfig: Shares some common cache-related settings at class-level.@Cacheable: Triggers cache population.触发将数据保存到缓存的操作@CacheEvict: Triggers cache eviction.触发将数据从缓存删除的操作@CachePut: Updates the cache without interfering with the method execution.不影响方法执行更新缓存@Caching: Regroups multiple cache operations to be applied on a method.组合以上多个操作@CacheConfig: Shares some common cache-related settings at class-level.在类级别共享缓存的相同配置5、自定义规则第一步、启动类上开启缓存功能:@EnableCaching第二部、只需要注解就能完成缓存操作@Cacheable:1、代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。如果缓存中没有,会调用方法,最后将方法的结果放入缓存。2、每一个需要缓存的数据我们都来指定要翻到那个名字的缓存【缓存的分区,推荐按照业务类型分】。3、key默认自动生成,value默认是使用jdk默认序列化机制。默认时间是-1,永不过期。因此需要我们自定义规则:自定义规则/** * @description: Cache redis数据格式配置 * @date: 2022/2/17 21:59 * @version: 1.0 */ import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer; import org.springframework.boot.autoconfigure.cache.CacheProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheConfiguration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; @EnableConfigurationProperties(CacheProperties.class) @Configuration @EnableCaching public class MyCacheConfig { @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){ RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } // if (redisProperties.getKeyPrefix() != null) { // config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); // } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }6、使用示例// 使用缓存注解方式,key:为方法名 @Override @Cacheable(value = {"record"},key = "#root.method.name") public RecordEntity getRecordAllInfoById(Long id) { //未使用注解缓存方案 // RecordEntity recordEntity = null; // // ValueOperations<String, String> forValue = redisTemplate.opsForValue(); // String recordData = forValue.get("RecordData"); // if(recordData==null){ // System.out.println("缓存没数据,执行查询数据库方法。。。。"); // recordEntity = getEntityByIdByFbsRedisson(id); // }else{ // System.out.println("从缓存中获取数据。。。。。"); // recordEntity = JSON.parseObject(recordData, new TypeReference<RecordEntity>() { // }); // } //使用注解缓存后 RecordEntity recordEntity = getEntityById(id); if(recordEntity!=null){ Long categoryId = recordEntity.getCategoryId(); Long[] catelogPath = findCatelogPath(categoryId); recordEntity.setCatelogPath(catelogPath); } return recordEntity; }key使用SPEL表达式规则可参考官网中 Using Custom Annotations使用规则。https://docs.spring.io/spring-framework/docs/5.3.18-SNAPSHOT/reference/html/integration.html#cache-annotation-stereotype
2022年03月24日
478 阅读
0 评论
8 点赞
2022-03-15
SpringBoot整合定时任务和异步任务
SpringBoot整合定时任务和异步任务一、定时任务SpringBoot整合quartz-scheduler,执行定时任务。1、开启定时任务@EnableScheduling2、开启一个定时任务@Scheduled3、编写cron表达式cron表达式格式请参考官方文档4、实例package com.yanxizhu.ifamily.booking.controller.scheduler; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * SpringBoot整合Scheduling定时任务 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/15 20:26 * @version: 1.0 */ @Slf4j @Component @EnableScheduling public class MyScheduler { /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ log.info("Hello"); } }5、定时任务总结1、@EnableScheduling 开启一个定时任务2、@Scheduled 开启一个定时任务3、编写定时规则,也就是cron表达式。6、注意事项spring中6位组成,不允许第7位的年。在周几的位置,1-7代表周一到周日,MON-SUN定时任务不应该阻塞,默认时阻塞的,应该以异步任务执行。7、解决方案解决定时任务阻塞问题:使用异步+定时任务二、异步任务执行几种方式1、异步方式提交线程池可以让业务运行以异步的方式,自己提交到线程池(CompletableFuture异步编排)。@Slf4j @Component @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ CompletableFuture.runAsync(()->{ //调用server业务方法 }, executorService); } }2、定时任务线程池支持定时任务线程池(定时任务线程池)。Scheduling自动配置类,默认只有1个线程。// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.ScheduledExecutorService; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.LazyInitializationExcludeFilter; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskSchedulingProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskSchedulerBuilder; import org.springframework.boot.task.TaskSchedulerCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @ConditionalOnClass({ThreadPoolTaskScheduler.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskSchedulingProperties.class}) @AutoConfigureAfter({TaskExecutionAutoConfiguration.class}) public class TaskSchedulingAutoConfiguration { public TaskSchedulingAutoConfiguration() { } @Bean @ConditionalOnBean( name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"} ) @ConditionalOnMissingBean({SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class}) public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) { return builder.build(); } @Bean public static LazyInitializationExcludeFilter scheduledBeanLazyInitializationExcludeFilter() { return new ScheduledBeanLazyInitializationExcludeFilter(); } @Bean @ConditionalOnMissingBean public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties, ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) { TaskSchedulerBuilder builder = new TaskSchedulerBuilder(); builder = builder.poolSize(properties.getPool().getSize()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskSchedulerCustomizers); return builder; } }默认只有一个线程package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.scheduling") public class TaskSchedulingProperties { private final TaskSchedulingProperties.Pool pool = new TaskSchedulingProperties.Pool(); private final TaskSchedulingProperties.Shutdown shutdown = new TaskSchedulingProperties.Shutdown(); private String threadNamePrefix = "scheduling-"; public TaskSchedulingProperties() { } public TaskSchedulingProperties.Pool getPool() { return this.pool; } public TaskSchedulingProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int size = 1; public Pool() { } public int getSize() { return this.size; } public void setSize(int size) { this.size = size; } } }配置线程池数量spring.task.scheduling.pool.size=5这样定时任务就有5个线程可以执行了,如果1个线程,定时任务就会阻塞。3、让定时任务执行@EnableAsync 开启异步任务功能@Async 在希望异步执行的方法开启异步执行注解,普通方法也可以使用。默认线程数8异步任务自动配置类源码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.Executor; import java.util.stream.Stream; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Pool; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskExecutorBuilder; import org.springframework.boot.task.TaskExecutorCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import org.springframework.core.task.TaskDecorator; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @ConditionalOnClass({ThreadPoolTaskExecutor.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskExecutionProperties.class}) public class TaskExecutionAutoConfiguration { public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; public TaskExecutionAutoConfiguration() { } @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); Stream var10001 = taskExecutorCustomizers.orderedStream(); var10001.getClass(); builder = builder.customizers(var10001::iterator); builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique()); return builder; } @Lazy @Bean( name = {"applicationTaskExecutor", "taskExecutor"} ) @ConditionalOnMissingBean({Executor.class}) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } }默认线程8package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties { private final TaskExecutionProperties.Pool pool = new TaskExecutionProperties.Pool(); private final TaskExecutionProperties.Shutdown shutdown = new TaskExecutionProperties.Shutdown(); private String threadNamePrefix = "task-"; public TaskExecutionProperties() { } public TaskExecutionProperties.Pool getPool() { return this.pool; } public TaskExecutionProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int queueCapacity = 2147483647; private int coreSize = 8; private int maxSize = 2147483647; private boolean allowCoreThreadTimeout = true; private Duration keepAlive = Duration.ofSeconds(60L); public Pool() { } public int getQueueCapacity() { return this.queueCapacity; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public int getCoreSize() { return this.coreSize; } public void setCoreSize(int coreSize) { this.coreSize = coreSize; } public int getMaxSize() { return this.maxSize; } public void setMaxSize(int maxSize) { this.maxSize = maxSize; } public boolean isAllowCoreThreadTimeout() { return this.allowCoreThreadTimeout; } public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { this.allowCoreThreadTimeout = allowCoreThreadTimeout; } public Duration getKeepAlive() { return this.keepAlive; } public void setKeepAlive(Duration keepAlive) { this.keepAlive = keepAlive; } } }通过配置线程池线程数spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=50实列import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component @EnableAsync @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Async @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ System.out.println("Hello ...."); } }要用线程池的时候,自己注入即可用了。
2022年03月15日
197 阅读
0 评论
12 点赞
2022-03-14
SpringBoot整合Sleuth+Zipkin 服务链路追踪
SpringBoot整合Sleuth+Zipkin 服务链路追踪一、整合 Sleu1、服务提供者与消费者导入依赖<!--整合 Sleut:服务提供者与消费者导入依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> <version>3.1.1</version> </dependency>2、打开 debug 日志logging: level: org.springframework.cloud.openfeign: debug org.springframework.cloud.sleuth: debug3、发起一次远程调用,观察控制台DEBUG [user-service,541450f08573fff5,541450f08573fff5,false]user-service:服务名 541450f08573fff5:是 TranceId,一条链路中,只有一个 TranceId 541450f08573fff5:是 spanId,链路中的基本工作单元 id false:表示是否将数据输出到其他服务,true 则会把信息输出到其他可视化的服务上观察二、整合 zipkin 可视化观察1、docker 安装 zipkin 服务器docker run --name zipkin -d -p 9411:9411 openzipkin/zipkin设置随docker启动docker update --restart=always zipkin2、导入依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> <version>2.2.8.RELEASE</version> </dependency>因为spring-cloud-starter-zipkin中引入了spring-cloud-starter-sleuth,因此上面的spring-cloud-starter-sleuth可以不用引入。3、添加 zipkin 相关配置spring: application: name: ifamily-booking zipkin: base-url: http://127.0.0.1:9411/ # zipkin 服务器的地址 # 关闭服务发现,否则 Spring Cloud 会把 zipkin 的 url 当做服务名称 discoveryClientEnabled: false sender: type: web # 设置使用 http 的方式传输数据 sleuth: sampler: probability: 1 # 设置抽样采集率为 100%,默认为 0.1,即 10%4、查看通过http://127.0.0.1:9411即可查看链路追踪信息。三、持久化链路追踪数据到ES启动命令docker run --env STORAGE_TYPE=elasticsearch --env ES_HOSTS=192.168.56.10:9200 openzipkin/zipkin-dependencies注意:这个是openzipkin/zipkin-dependencies,改版后的。
2022年03月14日
267 阅读
0 评论
7 点赞
2022-03-13
SpringBoot整合Sentinel
SpringBoot整合Sentinel1、引入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>2、下载安装Sentinel可视化控制台下载自己引入sentinel对应的核心包版本,下载地址:https://github.com/alibaba/Sentinel/releases运行Sentinel可视化控制台java -jar sentinel-dashboard-1.8.1.jar注意自己的版本。打开http://127.0.0.1:8080/,默认账号密码sentinel3、微服务配置sentinelapplication.ymlspring: cloud: sentinel: transport: port: 8719 dashboard: localhost:8080port: 8719端口随意,只要不被占用,用于各微服务与控制台通讯。4、查看监控信息启动微服务,随意访问一个接口,Sentinel控制台即可看到实施监控信息。5、启用相关功能可在Sentinel控制台调整相应功能。默认所有流控设置都是保存在内存中,重启服务就没有了。6、添加监控图标引入审计start,sentinel会自动根据spring-boot-starter-actuator监控。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.6.1</version> </dependency>开启监控图标management.endpoints.web.exposure.include=*7、自定义流控返回提示信息import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSON; import com.yanxizhu.common.utils.R; import org.springframework.context.annotation.Configuration; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @description: Sentinel流控信息提示自定i * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/13 20:14 * @version: 1.0 */ @Configuration public class MySentinelConfig implements BlockExceptionHandler { @Override public void handle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws Exception { R r = R.error(10000, "自定义提示"); httpServletResponse.setContentType("application/json;charset=utf-8"); httpServletResponse.getWriter().write(JSON.toJSONString(r)); } }限流规则可参考官网限流文档。每一个微服务都有一个自己的自定义流控返回提示信息,其它配置一样,只是提示信息不同。8、熔断、降级针对Feign远程调用熔断引入Feign远程调用依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>打开 Sentinel 对 Feign 的支持:feign.sentinel.enabled=true熔断实现@FeignClient(value = "renren-fast",fallback = SeckillFeignServiceFallBack.class) public interface CardFeignService { @GetMapping("/sys/user/list") public R list(@RequestParam Map<String, Object> params); }熔断回调@S1f4j @Component public class SeckillFeignServiceFallBack implements SeckillFeignService{ @Override public R getskuseckillInfo(Long skuld) { Log.info("熔断方法调用...getskuSecki11Info"); return R.error(com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MWANY_REQUEST.getCode(); com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MANY_REQUEST.getAsg()); } }调用方:手动指定远程服务的降级策略。远程服务被降级处理。触发我们的熔断回调方法。提供方:超大浏览的时侯,必须牺牲一些远程服务。在服务的提供方(远程服务)指定降级策略;提供方是在运行。但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据)。@SentinelResource 注解用来标识资源是否被限流、降级。上述例子上该注解的属性 sayHello 表示资源名。@SentinelResource 还提供了其它额外的属性如 blockHandler,blockHandlerClass,fallback 用于表示限流或降级的操作(注意有方法签名要求),更多内容可以参考 Sentinel 注解支持文档。若不配置 blockHandler、fallback 等函数,则被流控降级时方法会直接抛出对应的 BlockException;若方法未定义 throws BlockException 则会被 JVM 包装一层 UndeclaredThrowableException。注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。9、自定义受保护的资源1)、代码try(Entry entry =SphU.entry("seckillSkus"))(/业务逻辑catch(Execption e)(}2)、基于注解。eSentinelResource(value ="getCurrentSeckiLLSkusResource",blockHandler ="blockHandler")无论是1,2方式一定要配置被限流以后的默认返回注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。10、网关限流引入网关sentinel依赖<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId> <version>x.y.z</version> </dependency>使用时只需注入对应的 SentinelGatewayFilter 实例以及 SentinelGatewayBlockExceptionHandler 实例即可(若使用了 Spring Cloud Alibaba Sentinel,则只需按照文档进行配置即可,无需自己加 Configuration)。更多网关限流可参考官方文档11、定制网关流控返回参考官方文档
2022年03月13日
175 阅读
0 评论
4 点赞
2022-03-13
SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.3</version> </dependency>2、配置rabbitmq通过RabbitProperties.properties配置文件配置链接 rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true重点:#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个Returnconfirm template: mandatory: true3、开启Rabbit启动类添加@EnableRabbit注解@EnableRabbit @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }4、Rabbit使用现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate5、使用测试1、创建Exchange、Queue、Binding使用AmqpAdmin创建。通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) @Autowired AmqpAdmin amqpAdmin; /** * 创建Exchange交换机 */ @Test public void craeteChage() { /** * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) */ DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false); amqpAdmin.declareExchange(directExchange); } /** * 创建Queue交换机 */ @Test public void craetQueue() { /** * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) */ Queue queue = new Queue("Family-Hello-Queue", true,false, false); amqpAdmin.declareQueue(queue); } /** * 创建Bind交换机 */ @Test public void craetBind() { /** * public Binding(String destination, 绑定目标 * Binding.DestinationType destinationType, 绑定类型 * String exchange, 交换机 * String routingKey, router-key * @Nullable Map<String, Object> arguments 参数) { */ Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null); amqpAdmin.declareBinding(binding); }上面为DirectExchange类型交换机的创建,其它类型Exchange类似。更多API可参考AmqpAdmin的各方法。2、收发消息发送消息 /** * rabbitTemplate 发送消息 */ @Test public String sendMessage(){ //发送字符串 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }3、接受消息RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。 /** * 监听队列是各数组,可以同时监听多个队列 * 通过返回的消息类型知道是一个org.springframework.amqp.core.Message; * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public String sendMessage(){ //发送字符串 // rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; } /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity); }Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息注意:1)、同一个消息,只能有一个客户端收到2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息3)、@RabbitHandler也可以接受队列消息@RabbitListener可以用在类和方法上@RabbitHandler只能用在方法上。@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。简单代码如下:@RabbitListener(queues={"Family-Hello-Queue"}) @Service("memberService") public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService { //接受队列中指定的People类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } //接受队列中指定的User类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } }6、RabbitMQ Java配置Json序列化通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。@Bean @ConditionalOnMissingBean public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(); configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique()); configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList())); configurer.setRabbitProperties(properties); return configurer; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean({RabbitOperations.class}) public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); configurer.configure(template, connectionFactory); return template; }如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:public MessageConverter getMessageConverter() { return this.messageConverter; } private MessageConverter messageConverter = new SimpleMessageConverter(); 然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型 protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[])((byte[])object); messageProperties.setContentType("application/octet-stream"); } else if (object instanceof String) { try { bytes = ((String)object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException var6) { throw new MessageConversionException("failed to convert to Message content", var6); } messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object"); } if (bytes != null) { messageProperties.setContentLength((long)bytes.length); return new Message(bytes, messageProperties); } else { throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }如果是序列化的就是使用的SerializationUtils.serialize(object);else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object");因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。因此RabbitMQ自定义配置如下:import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: RabbitMQ配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 17:07 * @version: 1.0 */ @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }再次发送对象时,从mq里面获取的对象就是json格式了。7、开启消息回调确认机制#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个ReturnCallback template: mandatory: true #手动确认ack listener: simple: acknowledge-mode: manual8、重点:发送端:package com.yanxizhu.family.users.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @description: RabbitMQ配置 序列化 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 16:44 * @version: 1.0 */ @Configuration public class RabbitConfig { private RabbitTemplate rabbitTemplate; /** * rabbitTemplate循环依赖,解决方法:手动初始化模板方法 * @param connectionFactory * @return */ @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制rabbitMaTemplate * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。 * 4、手动ack如何签收 * //消息头信息 * MessageProperties messageProperties = message.getMessageProperties(); * long deliveryTag = messageProperties.getDeliveryTag(); * channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收 * 5、消费退回 * * //参数:long var1, boolean var3(是否批量), boolean var4 * channel.basicNack(deliveryTag,true,true); //可以批量回退 * * //参数:long var1, boolean var3 * channel.basicReject(deliveryTag,true); //单个回退 * * 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。 */ public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达服务器 * @param correlationData 当前消息关联的唯一数据(这个消息的唯一id) * @param ack 消息是否成功收到 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。 System.out.println("====确认收到消息了。。。。================"); System.out.println(correlationData+"|"+ack+"|"+cause); } }); //注意:必须要有这一步,不然不生效 rabbitTemplate.setMandatory(true); //设置消息抵达队列回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息用的哪个路由键 */ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("==========投递到队列失败了。。。=========="); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }); } } 发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。说明: * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。消费端手动ack /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitHandler public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException { //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); //手动确认签收 channel.basicAck(deliveryTag,true); System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity); //回退 //可以批量回退 // channel.basicNack(deliveryTag,true,true); //单个回退 //true重新排队,false丢弃 // channel.basicReject(deliveryTag,true); }说明:手动ack如何签收 //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收消费退回 //参数:long var1, boolean var3(是否批量), boolean var4 channel.basicNack(deliveryTag,true,true); //可以批量回退 //参数:long var1, boolean var3 channel.basicReject(deliveryTag,true); //单个回退 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。最后rabbitMQ配置:spring: rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 #开启发送端确认 publisher-confirm-type: correlated #开启发送端消息抵达队列的确认 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个retureconfirm template: mandatory: true #开启手动确认 listener: simple: acknowledge-mode: manualSpringBoot中使用延时队列1、Queue、Exchange、Binding可以@Bean进去 2、监听消息的方法可以有三种参数(不分数量,顺序) Object content, Message message, Channel channel 3、channel可以用来拒绝消息,否则自动ack;代码实现发送消息到MQ: public String sendDelayMessage(){ //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setId(11111111L); memberEntity.setAddress("死信队列测试"); memberEntity.setEmail("延迟队列,异步处理会员信息"); memberEntity.setCreateTime(new Date()); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。package com.yanxizhu.family.users.config; import com.rabbitmq.client.Channel; import com.yanxizhu.family.users.entity.MemberEntity; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @description: 延迟队列 * 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。 * 2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。 * 3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 20:47 * @version: 1.0 */ @Configuration public class DelayQueueConfig { /** * 监听最终的队列 * @param entity * @param channel * @param message * @throws IOException */ @RabbitListener(queues="member.release.queue") public void listener(MemberEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } //@bean自动创建交换机、路由、绑定 //交换机 @Bean public Exchange orderEventExchange(){ return new TopicExchange("member-event-exchange", true, false); } /** * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况) * @return */ //死信队列 @Bean public Queue memberDelayQueue(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机 arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键 arguments.put("x-message-ttl", 60000);//过期时间 //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) Queue queue = new Queue("member.delay.queue",true,false,false,arguments); return queue; } //将死信队列和交换机绑定,路由键-users.create.member @Bean public Binding orderCreateOrderBingding(){ //目的地、绑定类型、交换机、路由键、参数 //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) return new Binding("member.delay.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.create.member", null); } //最后过期的队列 @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("member.release.queue",true,false,false); return queue; } //将最后的队列和交换机绑定,路由键-user.release.member @Bean public Binding orderReleaseOrderBingding(){ return new Binding("member.release.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.release.member", null); } }
2022年03月13日
416 阅读
0 评论
9 点赞
2022-03-12
SpringBoot整合seata分布式事务(不适用高并发场景)
Springboot整合seata分布式事务一、创建seata日志表-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;二、安装事务协调器(seata-server)1、下载地址 2、导入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>3、启动seata服务器4、seata文件说明registry.conf:注册中心配置。这里执行注册中西为nacos。type = "nacos",config {执行seata配置数据放在那里,这里默认使用文件放配置数据。 type = "file"file.conf:seata默认配置信息存放这里。例如,事务日志存放地方配置## transaction log store, only used in server side store { ## store mode: file、db mode = "file" ## file store property file { ## store location dir dir = "sessionStore" # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions maxBranchSessionSize = 16384 # globe session size , if exceeded throws exceptions maxGlobalSessionSize = 512 # file buffer size , if exceeded allocate new buffer fileWriteBufferCacheSize = 16384 # when recover batch read size sessionReloadReadSize = 100 # async, sync flushDiskMode = async } ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "druid" ## mysql/oracle/postgresql/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param url = "jdbc:mysql://127.0.0.1:3306/seata?rewriteBatchedStatements=true" user = "mysql" password = "mysql" minConn = 5 maxConn = 30 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 } }三、自定义代理数据源注意:所有想要用到分布式事务的微服务使用seata DataSourceProxy代理自己的数据源。springboot默认使用的是Hikari数据源。DataSourceAutoConfiguration.java源代码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.jdbc; import javax.sql.DataSource; import javax.sql.XADataSource; import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionMessage; import org.springframework.boot.autoconfigure.condition.ConditionOutcome; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.autoconfigure.condition.ConditionMessage.Builder; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Dbcp2; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Generic; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Hikari; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.OracleUcp; import org.springframework.boot.autoconfigure.jdbc.DataSourceConfiguration.Tomcat; import org.springframework.boot.autoconfigure.jdbc.DataSourceInitializationConfiguration.InitializationSpecificCredentialsDataSourceInitializationConfiguration; import org.springframework.boot.autoconfigure.jdbc.DataSourceInitializationConfiguration.SharedCredentialsDataSourceInitializationConfiguration; import org.springframework.boot.autoconfigure.jdbc.metadata.DataSourcePoolMetadataProvidersConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.jdbc.DataSourceBuilder; import org.springframework.boot.jdbc.EmbeddedDatabaseConnection; import org.springframework.context.annotation.Condition; import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ConfigurationCondition.ConfigurationPhase; import org.springframework.core.env.Environment; import org.springframework.core.type.AnnotatedTypeMetadata; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType; import org.springframework.util.StringUtils; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({DataSource.class, EmbeddedDatabaseType.class}) @ConditionalOnMissingBean( type = {"io.r2dbc.spi.ConnectionFactory"} ) @EnableConfigurationProperties({DataSourceProperties.class}) @Import({DataSourcePoolMetadataProvidersConfiguration.class, InitializationSpecificCredentialsDataSourceInitializationConfiguration.class, SharedCredentialsDataSourceInitializationConfiguration.class}) public class DataSourceAutoConfiguration { public DataSourceAutoConfiguration() { } static class EmbeddedDatabaseCondition extends SpringBootCondition { private static final String DATASOURCE_URL_PROPERTY = "spring.datasource.url"; private final SpringBootCondition pooledCondition = new DataSourceAutoConfiguration.PooledDataSourceCondition(); EmbeddedDatabaseCondition() { } public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { Builder message = ConditionMessage.forCondition("EmbeddedDataSource", new Object[0]); if (this.hasDataSourceUrlProperty(context)) { return ConditionOutcome.noMatch(message.because("spring.datasource.url is set")); } else if (this.anyMatches(context, metadata, new Condition[]{this.pooledCondition})) { return ConditionOutcome.noMatch(message.foundExactly("supported pooled data source")); } else { EmbeddedDatabaseType type = EmbeddedDatabaseConnection.get(context.getClassLoader()).getType(); return type == null ? ConditionOutcome.noMatch(message.didNotFind("embedded database").atAll()) : ConditionOutcome.match(message.found("embedded database").items(new Object[]{type})); } } private boolean hasDataSourceUrlProperty(ConditionContext context) { Environment environment = context.getEnvironment(); if (environment.containsProperty("spring.datasource.url")) { try { return StringUtils.hasText(environment.getProperty("spring.datasource.url")); } catch (IllegalArgumentException var4) { } } return false; } } static class PooledDataSourceAvailableCondition extends SpringBootCondition { PooledDataSourceAvailableCondition() { } public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) { Builder message = ConditionMessage.forCondition("PooledDataSource", new Object[0]); return DataSourceBuilder.findType(context.getClassLoader()) != null ? ConditionOutcome.match(message.foundExactly("supported DataSource")) : ConditionOutcome.noMatch(message.didNotFind("supported DataSource").atAll()); } } static class PooledDataSourceCondition extends AnyNestedCondition { PooledDataSourceCondition() { super(ConfigurationPhase.PARSE_CONFIGURATION); } @Conditional({DataSourceAutoConfiguration.PooledDataSourceAvailableCondition.class}) static class PooledDataSourceAvailable { PooledDataSourceAvailable() { } } @ConditionalOnProperty( prefix = "spring.datasource", name = {"type"} ) static class ExplicitType { ExplicitType() { } } } @Configuration( proxyBeanMethods = false ) @Conditional({DataSourceAutoConfiguration.PooledDataSourceCondition.class}) @ConditionalOnMissingBean({DataSource.class, XADataSource.class}) @Import({Hikari.class, Tomcat.class, Dbcp2.class, OracleUcp.class, Generic.class, DataSourceJmxConfiguration.class}) protected static class PooledDataSourceConfiguration { protected PooledDataSourceConfiguration() { } } @Configuration( proxyBeanMethods = false ) @Conditional({DataSourceAutoConfiguration.EmbeddedDatabaseCondition.class}) @ConditionalOnMissingBean({DataSource.class, XADataSource.class}) @Import({EmbeddedDataSourceConfiguration.class}) protected static class EmbeddedDatabaseConfiguration { protected EmbeddedDatabaseConfiguration() { } } } 导入很多数据源 @Import({Hikari.class, Tomcat.class, Dbcp2.class, OracleUcp.class, Generic.class, DataSourceJmxConfiguration.class})DataSourceConfiguration.java源代码@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({HikariDataSource.class}) @ConditionalOnMissingBean({DataSource.class}) @ConditionalOnProperty( name = {"spring.datasource.type"}, havingValue = "com.zaxxer.hikari.HikariDataSource", matchIfMissing = true ) static class Hikari { Hikari() { } @Bean @ConfigurationProperties( prefix = "spring.datasource.hikari" ) HikariDataSource dataSource(DataSourceProperties properties) { HikariDataSource dataSource = (HikariDataSource)DataSourceConfiguration.createDataSource(properties, HikariDataSource.class); if (StringUtils.hasText(properties.getName())) { dataSource.setPoolName(properties.getName()); } return dataSource; } }自定义代理数据源配置:import com.zaxxer.hikari.HikariDataSource; import io.seata.rm.datasource.DataSourceProxy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import javax.sql.DataSource; /** * @description: Seata自定义代理数据源 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 14:41 * @version: 1.0 */ @Configuration public class MySeataConfig { @Autowired DataSourceProperties dataSourceProperties; @Bean public DataSource dataSource(DataSourceProperties dataSourceProperties){ HikariDataSource dataSource = dataSourceProperties.initializeDataSourceBuilder().type(HikariDataSource.class).build(); if (StringUtils.hasText(dataSourceProperties.getName())) { dataSource.setPoolName(dataSourceProperties.getName()); } return new DataSourceProxy(dataSource); } }四、复制seata配置文件到项目resources下主要复制seata里面的模板文件file.conf、registry.conf注意:file.conf 的 service.vgroup_mapping 配置必须和spring.application.name一致。vgroup_mapping.{应用名称}-fescar-service-group = "default"service{ vgroup_mapping.family-booking-fescar-service-group = "default" }family-booking:微服务名字也可以通过配置 spring.cloud.alibaba.seata.tx-service-group修改后缀,但是必须和file.conf中的配置保持一致五、使用在分布式事务入口方法加上全局事务注解@GlobalTransactional,远程调用方法上加本地事务注解@Transactional即可。@GlobalTransactional @Transactional public PageUtils queryPage(Map<String, Object> params) { //调用远程方法,另一个微服务的方法加上小事务@Transactional }六、使用场景不适用高并发的场景,适用普通的业务远程调用。seata默认是使用的AT模式。针对高并发场景还是的用柔性事务,消息队列、延迟队列,MQ中间件完成。
2022年03月12日
275 阅读
0 评论
8 点赞
2022-03-10
Springboot整合SpringSession,解决分布式session共享问题
springboot整合SpringSession1、引入依赖<dependencies> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency> </dependencies>2、springboot配置application.properties配置:保存类型spring.session.store-type=redis # Session store type.session过期时间:server.servlet.session.timeout= # Session timeout. If a duration suffix is not specified, seconds is used.其它配置信息可参考官网:spring.session.redis.flush-mode=on_save # Sessions flush mode. spring.session.redis.namespace=spring:session # Namespace for keys used to store sessions.连接信息配置:spring.redis.host=localhost # Redis server host. spring.redis.password= # Login password of the redis server. spring.redis.port=6379 # Redis server port.3、开启springSession主类加上注解@EnableRedisHttpSession@EnableRedisHttpSession4、使用注意:1、需要保存到redis中的数据必须序列化。2、需要实现session共享的微服务都必须整合SpringSession。5、解决子域session共享、cookie进行json序列化MySessionConfig配置package com.yanxizhu.family.booking.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.session.web.http.CookieSerializer; import org.springframework.session.web.http.DefaultCookieSerializer; /** * @description: springSession配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/10 20:11 * @version: 1.0 */ @Configuration public class MySessionConfig { @Bean public CookieSerializer cookieSerializer(){ DefaultCookieSerializer defaultCookieSerializer = new DefaultCookieSerializer(); //设置共享域名 defaultCookieSerializer.setDomainName("yanxizhu.com"); //设置session名称 defaultCookieSerializer.setCookieName("YANXIZHUSESSION"); return defaultCookieSerializer; } //session序列化机制 @Bean public RedisSerializer<Object> springSessionDefaultRedisSerializer(){ return new GenericJackson2JsonRedisSerializer(); } }注意:需要session共享的微服务都需要配置,或者抽取到公共模块。6、多系统-session共享问题(单点登录)抽取单独的认证中心微服务处理单点登录单点登录,可参考一个开源单点登录项目,许雪里/xxl-sso
2022年03月10日
255 阅读
0 评论
6 点赞
2022-03-03
springboot整合redis
适合当如缓存场景:即时性、数据一致性要求不高的。访问量大且更新频率不高的数据(读多,写少)承担持久化工作。读模式缓存使用流程:凡是放入缓存中的数据,我们应该指定过期时间,使其可以再系统即使没有主动更新数据也能自动触发数据加载进缓存的流程。避免业务崩溃导致的数据永久不一致问题。解决分布式缓存中本地缓存导致数据不一致问题,可以使用redis中间件解决。springboot整合redis1、引入springboot整合的start:pom文件引入依赖 <!--redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>引入依赖之后就会有RedisAutoConfiguration,里面可以看的到redis的配置文件Redis.Properties.@Configuration( proxyBeanMethods = false ) @ConditionalOnClass({RedisOperations.class}) @EnableConfigurationProperties({RedisProperties.class}) @Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class}) public class RedisAutoConfiguration { public RedisAutoConfiguration() { } @Bean @ConditionalOnMissingBean( name = {"redisTemplate"} ) @ConditionalOnSingleCandidate(RedisConnectionFactory.class) public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> template = new RedisTemplate(); template.setConnectionFactory(redisConnectionFactory); return template; } @Bean @ConditionalOnMissingBean @ConditionalOnSingleCandidate(RedisConnectionFactory.class) public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) { return new StringRedisTemplate(redisConnectionFactory); } }Redis.Properties文件里面包含了redis的配置信息:@ConfigurationProperties( prefix = "spring.redis" ) public class RedisProperties { private int database = 0; private String url; private String host = "localhost"; private String username; private String password; private int port = 6379; private boolean ssl; private Duration timeout; private Duration connectTimeout; private String clientName; private RedisProperties.ClientType clientType; private RedisProperties.Sentinel sentinel; private RedisProperties.Cluster cluster; private final RedisProperties.Jedis jedis = new RedisProperties.Jedis(); private final RedisProperties.Lettuce lettuce = new RedisProperties.Lettuce();2、redis配置,可以根据上面的Redis.Properties类型,再applicatioin.yml中配置相关属性。spring: redis: host: 127.0.0.1 port: 6379上面一个最简单的redis配置就完成了。3、使用redis由上面的RedisAutoConfiguration自动配置类,可以看到自动装配了RedisTemplate<Object, Object>、StringRedisTemplate。RedisTemplate<Object, Object>:一般是字符串的Key,和序列化后的字符串value。由于使用String类型的key、value较多,所以还提供了StringRedisTemplate。public class StringRedisTemplate extends RedisTemplate<String, String> { public StringRedisTemplate() { this.setKeySerializer(RedisSerializer.string()); this.setValueSerializer(RedisSerializer.string()); this.setHashKeySerializer(RedisSerializer.string()); this.setHashValueSerializer(RedisSerializer.string()); } public StringRedisTemplate(RedisConnectionFactory connectionFactory) { this(); this.setConnectionFactory(connectionFactory); this.afterPropertiesSet(); } protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) { return new DefaultStringRedisConnection(connection); } }StringRedisTemplate也是继承了RedisTemplate<String, String>,都是字符串的key,value.this.setKeySerializer(RedisSerializer.string());this.setValueSerializer(RedisSerializer.string());this.setHashKeySerializer(RedisSerializer.string());this.setHashValueSerializer(RedisSerializer.string());上面的key、value都是用RedisSerializer.string()类型序列化。使用配置好的StringRedisTemplate。StringRedisTemplate的value由多种不同类型的值。保存获取值: @Test public void testStringRedisTemplate(){ ValueOperations<String, String> opsForValue = stringRedisTemplate.opsForValue(); opsForValue.set("hello", "world"+ UUID.randomUUID().toString()); String hello = opsForValue.get("hello"); System.out.println("之前保存值为:"+hello); }输出结果:之前保存值为:world1e9f395e-67b8-4077-9912-c690c7da0f06redis数据库的值:注意保存获取时,key必须是一样的。value一般是序列化后的json字符串,因为json字符串是跨语言跨平台的。vlaue存复杂对象时,序列化可以用alibaba的fastjson。Map<String,List<UserEntity>> data:要存入redis的复杂数据,通过序列化后存在redis。 String s = JSON.toJSONString(data); ValueOperations<String, String> ops = redisTemplate.opsForValue(); ops.set("mydata",s);同样获取的json数据也需要反序列化后才能使用:比如转化成一个复杂数据格式String jsonStr= ops.get("mydata"); Map<String,List<UserEntity>> result = JSON.parseObject(jsonStr, new TypeReference<Map<String,List<UserEntity>>>() {});注意:springboot2.0后默认使用lettuce作为操作redis的客户端,使用netty进行网络通信。但是lettuce的bug导致netty容易堆外内存溢出。netty如果没有指定堆外内存,默认使用-Xmx300m。可以通过-Dio.netty.maxDirectMemory设置堆外内存大小,但是始终会出现堆外内存溢出。 private static void incrementMemoryCounter(int capacity) { if (DIRECT_MEMORY_COUNTER != null) { long newUsedMemory = DIRECT_MEMORY_COUNTER.addAndGet((long)capacity); if (newUsedMemory > DIRECT_MEMORY_LIMIT) { DIRECT_MEMORY_COUNTER.addAndGet((long)(-capacity)); throw new OutOfDirectMemoryError("failed to allocate " + capacity + " byte(s) of direct memory (used: " + (newUsedMemory - (long)capacity) + ", max: " + DIRECT_MEMORY_LIMIT + ')'); } } }解决方案:不能只用-Dio.netty.maxDirectMemory去调大内存。1、升级netty客户端。2、切换使用jedis如何使用jedisredis默认使用的是lettuce,因此需要排除掉lettuce,引入jedis。 <!--引入redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion><!--排除lettuce--> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <!--引入jedis--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>lettuce、jedis都是操作redis的底层客户端。RedisTemplate是对lettuce、jedis对redis操作的再次封装,以后操作都可以用RedisTemplate进行操作redis。通过redis的自动装配可以看到是引入了lettuce,jedis的。@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class}) public class RedisAutoConfiguration {通过jedis可以看到。class JedisConnectionConfiguration extends RedisConnectionConfiguration { JedisConnectionConfiguration(RedisProperties properties, ObjectProvider<RedisSentinelConfiguration> sentinelConfiguration, ObjectProvider<RedisClusterConfiguration> clusterConfiguration) { super(properties, sentinelConfiguration, clusterConfiguration); } @Bean JedisConnectionFactory redisConnectionFactory(ObjectProvider<JedisClientConfigurationBuilderCustomizer> builderCustomizers) { return this.createJedisConnectionFactory(builderCustomizers); }无论是lettuce、jedis最后都会注入JedisConnectionFactory,就是redis要用的。高并发下缓存失效问题-缓存穿透:缓存穿透:指查询一个一定不存的数据,由于缓存是不命中,将去查询数据库,但是数据库也没有此记录,我们将这此查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层查询,失去了缓存的意义。风险:利用不存在的数据进行攻击,,数据库瞬时压力增大,最终导致崩溃。解决方案:null结果也缓存到redis,并加入短暂的过期时间。高并发下缓存失效-缓存雪崩缓存雪崩:指在我们设置缓存时key采用了相同的过期时间,导致缓存在某一时间同时失效,请求全部转发到DB,DB瞬时压力过大雪崩。解决方案:原有的失效时间基础上增加一个随机值,比如11-5分分钟随机,这样每一个缓存的过期时间重复概率就会降低,很难引发集体失效。高并发下缓存失效问题-缓存击穿缓存穿透:对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种费用“热点”的数据。如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到DB上,我们称为缓存击穿。解决方案:加锁大量并发只让一个去查,其他人等待,查到以后释放锁,其它人获取到锁,先差缓存,就会有数据了,而不用去DB查询。总结:1、解决缓存穿透:空结果也缓存。2、解决缓存雪崩:设置过期时间(随机值)。3、解决缓存击穿:加锁。例如:ops.set("RecordData",null, 1, TimeUnit.DAYS); //null值也缓存 ops.set("RecordData",JSON.toJSONString(recordEntity), 1, TimeUnit.DAYS); 解锁:1、通过synchronized/Lock本地锁。2、分布式锁解决方案Redisson。但是本地锁synchronized/Lock不能解决分布式带来的击穿问题。因此分布式还得用Redisson进行加锁解决。redisson分布式锁,可参考另一篇redisson分布式锁。 //本地加锁 public RecordEntity getEntityByIdByRedis(Long id){ synchronized (this){ ValueOperations<String, String> ops = redisTemplate.opsForValue(); String recordData = ops.get("RecordData"); if(recordData!=null){ System.out.println("从数据库查询数据之前,有缓存数据。。。。"); return JSON.parseObject(recordData, new TypeReference<RecordEntity>() {}); } System.out.println("从数据库查询数据...."); RecordEntity recordEntity = baseMapper.selectById(id); ops.set("RecordData",JSON.toJSONString(recordEntity), 1, TimeUnit.DAYS); return recordEntity; } } //redis分布式锁 public RecordEntity getEntityByIdByFbs(Long id){ String uuid = UUID.randomUUID().toString(); ValueOperations<String, String> ops = redisTemplate.opsForValue(); //保证原子性,加锁同时设置过期时间 Boolean lock = ops.setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS); if(lock){ System.out.println("获取分布式锁成功。。。。。"); RecordEntity recordEntity = null; try{ recordEntity = getEntityById(id); }finally { //删除锁保证原子性,使用脚本 String script ="if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); } return recordEntity; }else{ System.out.println("获取分布式锁失败,等待重试。。。。"); //重试,可以等待sleep一下 try{ Thread.sleep(200); }catch (Exception e){ } return getEntityByIdByFbs(id); } } //业务代码 public RecordEntity getEntityById(Long id){ ValueOperations<String, String> ops = redisTemplate.opsForValue(); String recordData = ops.get("RecordData"); if(recordData!=null){ System.out.println("从数据库查询数据之前,有缓存数据。。。。"); return JSON.parseObject(recordData, new TypeReference<RecordEntity>() {}); } System.out.println("从数据库查询数据...."); RecordEntity recordEntity = baseMapper.selectById(id); ops.set("RecordData",JSON.toJSONString(recordEntity), 1, TimeUnit.DAYS); return recordEntity; } //Redisson分布式锁 public RecordEntity getEntityByIdByFbsRedisson(Long id){ //保证原子性,加锁同时设置过期时间 RLock lock = redissonClient.getLock("RecordData-lock"); lock.lock(); RecordEntity recordEntity = null; try { System.out.println("获取分布式锁成功。。。。。"); recordEntity = getEntityById(id); }finally { System.out.println("释放锁成功。。。。。"); lock.unlock(); } return recordEntity; } // 使用缓存注解方式 @Override @Cacheable(value = {"record"},key = "#root.method.name") public RecordEntity getRecordAllInfoById(Long id) { //未使用注解缓存方案 // RecordEntity recordEntity = null; // // ValueOperations<String, String> forValue = redisTemplate.opsForValue(); // String recordData = forValue.get("RecordData"); // if(recordData==null){ // System.out.println("缓存没数据,执行查询数据库方法。。。。"); // recordEntity = getEntityByIdByFbsRedisson(id); // }else{ // System.out.println("从缓存中获取数据。。。。。"); // recordEntity = JSON.parseObject(recordData, new TypeReference<RecordEntity>() { // }); // } //使用注解缓存后 RecordEntity recordEntity = getEntityById(id); if(recordEntity!=null){ Long categoryId = recordEntity.getCategoryId(); Long[] catelogPath = findCatelogPath(categoryId); recordEntity.setCatelogPath(catelogPath); } return recordEntity; }利用redis的set NX实现分布式redisson分布式锁,实现原理就是利用redis的set nx实现的。SET key value [EX seconds] [PX milliseconds] [NX|XX]分布式锁原理可以看redis官方文档set nx命令。更多分布式锁可参考,另一个篇springboot整合分布式锁redisson。
2022年03月03日
185 阅读
0 评论
6 点赞