首页
友链
Search
1
JAVA IO
195 阅读
2
SpringBoot整合SpringCache
164 阅读
3
Docker搭建Typecho博客
162 阅读
4
wlop 4K 壁纸 4k8k 动态 壁纸
145 阅读
5
微服务项目搭建
111 阅读
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
登录
Search
标签搜索
java
javase
springboot
docker
thread
spring
分布式
锁
redis
linux
typecho
源码
mysql
software
git
算法
synchronized
ArrayList
springboot整合
ThreadPool
少年
累计撰写
130
篇文章
累计收到
5
条评论
首页
栏目
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
开发工具
百度网盘资源
天翼网盘资源
阿里网盘资源
页面
友链
搜索到
8
篇与
框架整合
的结果
2022-03-24
SpringBoot整合SpringCache
SpringBoot整合SpringCache1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency>2、引入redis依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>配置redis地址密码等等,可参考springboo整合redis3、默认配置自动配置// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.cache; import java.util.List; import java.util.stream.Collectors; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.data.couchbase.CouchbaseDataAutoConfiguration; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.boot.autoconfigure.hazelcast.HazelcastAutoConfiguration; import org.springframework.boot.autoconfigure.orm.jpa.EntityManagerFactoryDependsOnPostProcessor; import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.CacheManager; import org.springframework.cache.interceptor.CacheAspectSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.context.annotation.ImportSelector; import org.springframework.core.type.AnnotationMetadata; import org.springframework.orm.jpa.AbstractEntityManagerFactoryBean; import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean; import org.springframework.util.Assert; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({CacheManager.class}) @ConditionalOnBean({CacheAspectSupport.class}) @ConditionalOnMissingBean( value = {CacheManager.class}, name = {"cacheResolver"} ) @EnableConfigurationProperties({CacheProperties.class}) @AutoConfigureAfter({CouchbaseDataAutoConfiguration.class, HazelcastAutoConfiguration.class, HibernateJpaAutoConfiguration.class, RedisAutoConfiguration.class}) @Import({CacheAutoConfiguration.CacheConfigurationImportSelector.class, CacheAutoConfiguration.CacheManagerEntityManagerFactoryDependsOnPostProcessor.class}) public class CacheAutoConfiguration { public CacheAutoConfiguration() { } @Bean @ConditionalOnMissingBean public CacheManagerCustomizers cacheManagerCustomizers(ObjectProvider<CacheManagerCustomizer<?>> customizers) { return new CacheManagerCustomizers((List)customizers.orderedStream().collect(Collectors.toList())); } @Bean public CacheAutoConfiguration.CacheManagerValidator cacheAutoConfigurationValidator(CacheProperties cacheProperties, ObjectProvider<CacheManager> cacheManager) { return new CacheAutoConfiguration.CacheManagerValidator(cacheProperties, cacheManager); } static class CacheConfigurationImportSelector implements ImportSelector { CacheConfigurationImportSelector() { } public String[] selectImports(AnnotationMetadata importingClassMetadata) { CacheType[] types = CacheType.values(); String[] imports = new String[types.length]; for(int i = 0; i < types.length; ++i) { imports[i] = CacheConfigurations.getConfigurationClass(types[i]); } return imports; } } static class CacheManagerValidator implements InitializingBean { private final CacheProperties cacheProperties; private final ObjectProvider<CacheManager> cacheManager; CacheManagerValidator(CacheProperties cacheProperties, ObjectProvider<CacheManager> cacheManager) { this.cacheProperties = cacheProperties; this.cacheManager = cacheManager; } public void afterPropertiesSet() { Assert.notNull(this.cacheManager.getIfAvailable(), () -> { return "No cache manager could be auto-configured, check your configuration (caching type is '" + this.cacheProperties.getType() + "')"; }); } } @ConditionalOnClass({LocalContainerEntityManagerFactoryBean.class}) @ConditionalOnBean({AbstractEntityManagerFactoryBean.class}) static class CacheManagerEntityManagerFactoryDependsOnPostProcessor extends EntityManagerFactoryDependsOnPostProcessor { CacheManagerEntityManagerFactoryDependsOnPostProcessor() { super(new String[]{"cacheManager"}); } } }所有相关配置都在CacheProperties。重点关注自动配置中的: static class CacheConfigurationImportSelector implements ImportSelector { CacheConfigurationImportSelector() { } public String[] selectImports(AnnotationMetadata importingClassMetadata) { CacheType[] types = CacheType.values(); String[] imports = new String[types.length]; for(int i = 0; i < types.length; ++i) { imports[i] = CacheConfigurations.getConfigurationClass(types[i]); } return imports; } }CacheConfigurations.getConfigurationClass(types[i]); static String getConfigurationClass(CacheType cacheType) { String configurationClassName = (String)MAPPINGS.get(cacheType); Assert.state(configurationClassName != null, () -> { return "Unknown cache type " + cacheType; }); return configurationClassName; }MAPPINGS.get(cacheType);static { Map<CacheType, String> mappings = new EnumMap(CacheType.class); mappings.put(CacheType.GENERIC, GenericCacheConfiguration.class.getName()); mappings.put(CacheType.EHCACHE, EhCacheCacheConfiguration.class.getName()); mappings.put(CacheType.HAZELCAST, HazelcastCacheConfiguration.class.getName()); mappings.put(CacheType.INFINISPAN, InfinispanCacheConfiguration.class.getName()); mappings.put(CacheType.JCACHE, JCacheCacheConfiguration.class.getName()); mappings.put(CacheType.COUCHBASE, CouchbaseCacheConfiguration.class.getName()); mappings.put(CacheType.REDIS, RedisCacheConfiguration.class.getName()); mappings.put(CacheType.CAFFEINE, CaffeineCacheConfiguration.class.getName()); mappings.put(CacheType.SIMPLE, SimpleCacheConfiguration.class.getName()); mappings.put(CacheType.NONE, NoOpCacheConfiguration.class.getName()); MAPPINGS = Collections.unmodifiableMap(mappings); }mappings.put(CacheType.REDIS, RedisCacheConfiguration.class.getName());CacheAutoConfiguration会导入RedisCacheConfiguration;// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.cache; import java.util.LinkedHashSet; import java.util.List; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.cache.CacheProperties.Redis; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.cache.CacheManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ResourceLoader; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.cache.RedisCacheManager.RedisCacheManagerBuilder; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext.SerializationPair; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({RedisConnectionFactory.class}) @AutoConfigureAfter({RedisAutoConfiguration.class}) @ConditionalOnBean({RedisConnectionFactory.class}) @ConditionalOnMissingBean({CacheManager.class}) @Conditional({CacheCondition.class}) class RedisCacheConfiguration { RedisCacheConfiguration() { } @Bean RedisCacheManager cacheManager(CacheProperties cacheProperties, CacheManagerCustomizers cacheManagerCustomizers, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ObjectProvider<RedisCacheManagerBuilderCustomizer> redisCacheManagerBuilderCustomizers, RedisConnectionFactory redisConnectionFactory, ResourceLoader resourceLoader) { RedisCacheManagerBuilder builder = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(this.determineConfiguration(cacheProperties, redisCacheConfiguration, resourceLoader.getClassLoader())); List<String> cacheNames = cacheProperties.getCacheNames(); if (!cacheNames.isEmpty()) { builder.initialCacheNames(new LinkedHashSet(cacheNames)); } if (cacheProperties.getRedis().isEnableStatistics()) { builder.enableStatistics(); } redisCacheManagerBuilderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return (RedisCacheManager)cacheManagerCustomizers.customize(builder.build()); } private org.springframework.data.redis.cache.RedisCacheConfiguration determineConfiguration(CacheProperties cacheProperties, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ClassLoader classLoader) { return (org.springframework.data.redis.cache.RedisCacheConfiguration)redisCacheConfiguration.getIfAvailable(() -> { return this.createConfiguration(cacheProperties, classLoader); }); } private org.springframework.data.redis.cache.RedisCacheConfiguration createConfiguration(CacheProperties cacheProperties, ClassLoader classLoader) { Redis redisProperties = cacheProperties.getRedis(); org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeValuesWith(SerializationPair.fromSerializer(new JdkSerializationRedisSerializer(classLoader))); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } } 关注里面的方法:@Bean RedisCacheManager cacheManager(CacheProperties cacheProperties, CacheManagerCustomizers cacheManagerCustomizers, ObjectProvider<org.springframework.data.redis.cache.RedisCacheConfiguration> redisCacheConfiguration, ObjectProvider<RedisCacheManagerBuilderCustomizer> redisCacheManagerBuilderCustomizers, RedisConnectionFactory redisConnectionFactory, ResourceLoader resourceLoader) { RedisCacheManagerBuilder builder = RedisCacheManager.builder(redisConnectionFactory).cacheDefaults(this.determineConfiguration(cacheProperties, redisCacheConfiguration, resourceLoader.getClassLoader())); List<String> cacheNames = cacheProperties.getCacheNames(); if (!cacheNames.isEmpty()) { builder.initialCacheNames(new LinkedHashSet(cacheNames)); } if (cacheProperties.getRedis().isEnableStatistics()) { builder.enableStatistics(); } redisCacheManagerBuilderCustomizers.orderedStream().forEach((customizer) -> { customizer.customize(builder); }); return (RedisCacheManager)cacheManagerCustomizers.customize(builder.build()); }相当于自动配置好了缓存管理器RedisCacheManager,cacheProperties.getCacheNames();方法会读取yml里面的配置,接着初始化缓存builder.initialCacheNames(new LinkedHashSet(cacheNames)); public RedisCacheManager.RedisCacheManagerBuilder initialCacheNames(Set<String> cacheNames) { Assert.notNull(cacheNames, "CacheNames must not be null!"); cacheNames.forEach((it) -> { this.withCacheConfiguration(it, this.defaultCacheConfiguration); }); return this; }this.withCacheConfiguration(it, this.defaultCacheConfiguration);public RedisCacheManager.RedisCacheManagerBuilder withCacheConfiguration(String cacheName, RedisCacheConfiguration cacheConfiguration) { Assert.notNull(cacheName, "CacheName must not be null!"); Assert.notNull(cacheConfiguration, "CacheConfiguration must not be null!"); this.initialCaches.put(cacheName, cacheConfiguration); return this; }RedisCacheConfiguration中默认缓存定义规则:private org.springframework.data.redis.cache.RedisCacheConfiguration createConfiguration(CacheProperties cacheProperties, ClassLoader classLoader) { Redis redisProperties = cacheProperties.getRedis(); org.springframework.data.redis.cache.RedisCacheConfiguration config = org.springframework.data.redis.cache.RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeValuesWith(SerializationPair.fromSerializer(new JdkSerializationRedisSerializer(classLoader))); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; }redisProperties也就是从yml中得到的配置。4、配置需要配置的地方:配置使用redis作为缓存spring.cache.type=redis使用缓存,官方文档 https://docs.spring.io/spring-framework/docs/5.3.18-SNAPSHOT/reference/html/integration.html#cache重点关注几个注解:@Cacheable: Triggers cache population. @CacheEvict: Triggers cache eviction. @CachePut: Updates the cache without interfering with the method execution. @Caching: Regroups multiple cache operations to be applied on a method. @CacheConfig: Shares some common cache-related settings at class-level.@Cacheable: Triggers cache population.触发将数据保存到缓存的操作@CacheEvict: Triggers cache eviction.触发将数据从缓存删除的操作@CachePut: Updates the cache without interfering with the method execution.不影响方法执行更新缓存@Caching: Regroups multiple cache operations to be applied on a method.组合以上多个操作@CacheConfig: Shares some common cache-related settings at class-level.在类级别共享缓存的相同配置5、自定义规则第一步、启动类上开启缓存功能:@EnableCaching第二部、只需要注解就能完成缓存操作@Cacheable:1、代表当前方法的结果需要缓存,如果缓存中有,方法不用调用。如果缓存中没有,会调用方法,最后将方法的结果放入缓存。2、每一个需要缓存的数据我们都来指定要翻到那个名字的缓存【缓存的分区,推荐按照业务类型分】。3、key默认自动生成,value默认是使用jdk默认序列化机制。默认时间是-1,永不过期。因此需要我们自定义规则:自定义规则/** * @description: Cache redis数据格式配置 * @date: 2022/2/17 21:59 * @version: 1.0 */ import com.alibaba.fastjson.support.spring.GenericFastJsonRedisSerializer; import org.springframework.boot.autoconfigure.cache.CacheProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheConfiguration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializationContext; import org.springframework.data.redis.serializer.StringRedisSerializer; @EnableConfigurationProperties(CacheProperties.class) @Configuration @EnableCaching public class MyCacheConfig { @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){ RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer())); CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } // if (redisProperties.getKeyPrefix() != null) { // config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); // } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } }6、使用示例// 使用缓存注解方式,key:为方法名 @Override @Cacheable(value = {"record"},key = "#root.method.name") public RecordEntity getRecordAllInfoById(Long id) { //未使用注解缓存方案 // RecordEntity recordEntity = null; // // ValueOperations<String, String> forValue = redisTemplate.opsForValue(); // String recordData = forValue.get("RecordData"); // if(recordData==null){ // System.out.println("缓存没数据,执行查询数据库方法。。。。"); // recordEntity = getEntityByIdByFbsRedisson(id); // }else{ // System.out.println("从缓存中获取数据。。。。。"); // recordEntity = JSON.parseObject(recordData, new TypeReference<RecordEntity>() { // }); // } //使用注解缓存后 RecordEntity recordEntity = getEntityById(id); if(recordEntity!=null){ Long categoryId = recordEntity.getCategoryId(); Long[] catelogPath = findCatelogPath(categoryId); recordEntity.setCatelogPath(catelogPath); } return recordEntity; }key使用SPEL表达式规则可参考官网中 Using Custom Annotations使用规则。https://docs.spring.io/spring-framework/docs/5.3.18-SNAPSHOT/reference/html/integration.html#cache-annotation-stereotype
2022年03月24日
164 阅读
0 评论
7 点赞
2022-03-15
SpringBoot整合定时任务和异步任务
SpringBoot整合定时任务和异步任务一、定时任务SpringBoot整合quartz-scheduler,执行定时任务。1、开启定时任务@EnableScheduling2、开启一个定时任务@Scheduled3、编写cron表达式cron表达式格式请参考官方文档4、实例package com.yanxizhu.ifamily.booking.controller.scheduler; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * SpringBoot整合Scheduling定时任务 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/15 20:26 * @version: 1.0 */ @Slf4j @Component @EnableScheduling public class MyScheduler { /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ log.info("Hello"); } }5、定时任务总结1、@EnableScheduling 开启一个定时任务2、@Scheduled 开启一个定时任务3、编写定时规则,也就是cron表达式。6、注意事项spring中6位组成,不允许第7位的年。在周几的位置,1-7代表周一到周日,MON-SUN定时任务不应该阻塞,默认时阻塞的,应该以异步任务执行。7、解决方案解决定时任务阻塞问题:使用异步+定时任务二、异步任务执行几种方式1、异步方式提交线程池可以让业务运行以异步的方式,自己提交到线程池(CompletableFuture异步编排)。@Slf4j @Component @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ CompletableFuture.runAsync(()->{ //调用server业务方法 }, executorService); } }2、定时任务线程池支持定时任务线程池(定时任务线程池)。Scheduling自动配置类,默认只有1个线程。// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.ScheduledExecutorService; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.LazyInitializationExcludeFilter; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskSchedulingProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskSchedulerBuilder; import org.springframework.boot.task.TaskSchedulerCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @ConditionalOnClass({ThreadPoolTaskScheduler.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskSchedulingProperties.class}) @AutoConfigureAfter({TaskExecutionAutoConfiguration.class}) public class TaskSchedulingAutoConfiguration { public TaskSchedulingAutoConfiguration() { } @Bean @ConditionalOnBean( name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"} ) @ConditionalOnMissingBean({SchedulingConfigurer.class, TaskScheduler.class, ScheduledExecutorService.class}) public ThreadPoolTaskScheduler taskScheduler(TaskSchedulerBuilder builder) { return builder.build(); } @Bean public static LazyInitializationExcludeFilter scheduledBeanLazyInitializationExcludeFilter() { return new ScheduledBeanLazyInitializationExcludeFilter(); } @Bean @ConditionalOnMissingBean public TaskSchedulerBuilder taskSchedulerBuilder(TaskSchedulingProperties properties, ObjectProvider<TaskSchedulerCustomizer> taskSchedulerCustomizers) { TaskSchedulerBuilder builder = new TaskSchedulerBuilder(); builder = builder.poolSize(properties.getPool().getSize()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); builder = builder.customizers(taskSchedulerCustomizers); return builder; } }默认只有一个线程package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.scheduling") public class TaskSchedulingProperties { private final TaskSchedulingProperties.Pool pool = new TaskSchedulingProperties.Pool(); private final TaskSchedulingProperties.Shutdown shutdown = new TaskSchedulingProperties.Shutdown(); private String threadNamePrefix = "scheduling-"; public TaskSchedulingProperties() { } public TaskSchedulingProperties.Pool getPool() { return this.pool; } public TaskSchedulingProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int size = 1; public Pool() { } public int getSize() { return this.size; } public void setSize(int size) { this.size = size; } } }配置线程池数量spring.task.scheduling.pool.size=5这样定时任务就有5个线程可以执行了,如果1个线程,定时任务就会阻塞。3、让定时任务执行@EnableAsync 开启异步任务功能@Async 在希望异步执行的方法开启异步执行注解,普通方法也可以使用。默认线程数8异步任务自动配置类源码:// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.springframework.boot.autoconfigure.task; import java.util.concurrent.Executor; import java.util.stream.Stream; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Pool; import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Shutdown; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.task.TaskExecutorBuilder; import org.springframework.boot.task.TaskExecutorCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import org.springframework.core.task.TaskDecorator; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @ConditionalOnClass({ThreadPoolTaskExecutor.class}) @Configuration( proxyBeanMethods = false ) @EnableConfigurationProperties({TaskExecutionProperties.class}) public class TaskExecutionAutoConfiguration { public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor"; public TaskExecutionAutoConfiguration() { } @Bean @ConditionalOnMissingBean public TaskExecutorBuilder taskExecutorBuilder(TaskExecutionProperties properties, ObjectProvider<TaskExecutorCustomizer> taskExecutorCustomizers, ObjectProvider<TaskDecorator> taskDecorator) { Pool pool = properties.getPool(); TaskExecutorBuilder builder = new TaskExecutorBuilder(); builder = builder.queueCapacity(pool.getQueueCapacity()); builder = builder.corePoolSize(pool.getCoreSize()); builder = builder.maxPoolSize(pool.getMaxSize()); builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout()); builder = builder.keepAlive(pool.getKeepAlive()); Shutdown shutdown = properties.getShutdown(); builder = builder.awaitTermination(shutdown.isAwaitTermination()); builder = builder.awaitTerminationPeriod(shutdown.getAwaitTerminationPeriod()); builder = builder.threadNamePrefix(properties.getThreadNamePrefix()); Stream var10001 = taskExecutorCustomizers.orderedStream(); var10001.getClass(); builder = builder.customizers(var10001::iterator); builder = builder.taskDecorator((TaskDecorator)taskDecorator.getIfUnique()); return builder; } @Lazy @Bean( name = {"applicationTaskExecutor", "taskExecutor"} ) @ConditionalOnMissingBean({Executor.class}) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); } }默认线程8package org.springframework.boot.autoconfigure.task; import java.time.Duration; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("spring.task.execution") public class TaskExecutionProperties { private final TaskExecutionProperties.Pool pool = new TaskExecutionProperties.Pool(); private final TaskExecutionProperties.Shutdown shutdown = new TaskExecutionProperties.Shutdown(); private String threadNamePrefix = "task-"; public TaskExecutionProperties() { } public TaskExecutionProperties.Pool getPool() { return this.pool; } public TaskExecutionProperties.Shutdown getShutdown() { return this.shutdown; } public String getThreadNamePrefix() { return this.threadNamePrefix; } public void setThreadNamePrefix(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } public static class Shutdown { private boolean awaitTermination; private Duration awaitTerminationPeriod; public Shutdown() { } public boolean isAwaitTermination() { return this.awaitTermination; } public void setAwaitTermination(boolean awaitTermination) { this.awaitTermination = awaitTermination; } public Duration getAwaitTerminationPeriod() { return this.awaitTerminationPeriod; } public void setAwaitTerminationPeriod(Duration awaitTerminationPeriod) { this.awaitTerminationPeriod = awaitTerminationPeriod; } } public static class Pool { private int queueCapacity = 2147483647; private int coreSize = 8; private int maxSize = 2147483647; private boolean allowCoreThreadTimeout = true; private Duration keepAlive = Duration.ofSeconds(60L); public Pool() { } public int getQueueCapacity() { return this.queueCapacity; } public void setQueueCapacity(int queueCapacity) { this.queueCapacity = queueCapacity; } public int getCoreSize() { return this.coreSize; } public void setCoreSize(int coreSize) { this.coreSize = coreSize; } public int getMaxSize() { return this.maxSize; } public void setMaxSize(int maxSize) { this.maxSize = maxSize; } public boolean isAllowCoreThreadTimeout() { return this.allowCoreThreadTimeout; } public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) { this.allowCoreThreadTimeout = allowCoreThreadTimeout; } public Duration getKeepAlive() { return this.keepAlive; } public void setKeepAlive(Duration keepAlive) { this.keepAlive = keepAlive; } } }通过配置线程池线程数spring.task.execution.pool.core-size=5 spring.task.execution.pool.max-size=50实列import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component @EnableAsync @EnableScheduling public class MyScheduler { public static ExecutorService executorService = Executors.newFixedThreadPool(10); /** * cron表达式 */ @Async @Scheduled(cron = "1/3 1/1 * * * ? ") public void sayHell(){ System.out.println("Hello ...."); } }要用线程池的时候,自己注入即可用了。
2022年03月15日
49 阅读
0 评论
11 点赞
2022-03-14
SpringBoot整合Sleuth+Zipkin 服务链路追踪
SpringBoot整合Sleuth+Zipkin 服务链路追踪一、整合 Sleu1、服务提供者与消费者导入依赖<!--整合 Sleut:服务提供者与消费者导入依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> <version>3.1.1</version> </dependency>2、打开 debug 日志logging: level: org.springframework.cloud.openfeign: debug org.springframework.cloud.sleuth: debug3、发起一次远程调用,观察控制台DEBUG [user-service,541450f08573fff5,541450f08573fff5,false]user-service:服务名 541450f08573fff5:是 TranceId,一条链路中,只有一个 TranceId 541450f08573fff5:是 spanId,链路中的基本工作单元 id false:表示是否将数据输出到其他服务,true 则会把信息输出到其他可视化的服务上观察二、整合 zipkin 可视化观察1、docker 安装 zipkin 服务器docker run -d -p 9411:9411 openzipkin/zipkin设置随docker启动docker update --restart=always 容器名称2、导入依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> <version>2.2.8.RELEASE</version> </dependency>因为spring-cloud-starter-zipkin中引入了spring-cloud-starter-sleuth,因此上面的spring-cloud-starter-sleuth可以不用引入。3、添加 zipkin 相关配置spring: application: name: ifamily-booking zipkin: base-url: http://127.0.0.1:9411/ # zipkin 服务器的地址 # 关闭服务发现,否则 Spring Cloud 会把 zipkin 的 url 当做服务名称 discoveryClientEnabled: false sender: type: web # 设置使用 http 的方式传输数据 sleuth: sampler: probability: 1 # 设置抽样采集率为 100%,默认为 0.1,即 10%4、查看通过http://127.0.0.1:9411即可查看链路追踪信息。三、持久化链路追踪数据到ES启动命令docker run --env STORAGE_TYPE=elasticsearch --env ES_HOSTS=192.168.56.10:9200 openzipkin/zipkin-dependencies注意:这个是openzipkin/zipkin-dependencies,改版后的。
2022年03月14日
63 阅读
0 评论
7 点赞
2022-03-13
SpringBoot整合Sentinel
SpringBoot整合Sentinel1、引入依赖<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>2、下载安装Sentinel可视化控制台下载自己引入sentinel对应的核心包版本,下载地址:https://github.com/alibaba/Sentinel/releases运行Sentinel可视化控制台java -jar sentinel-dashboard-1.8.1.jar注意自己的版本。打开http://127.0.0.1:8080/,默认账号密码sentinel3、微服务配置sentinelapplication.ymlspring: cloud: sentinel: transport: port: 8719 dashboard: localhost:8080port: 8719端口随意,只要不被占用,用于各微服务与控制台通讯。4、查看监控信息启动微服务,随意访问一个接口,Sentinel控制台即可看到实施监控信息。5、启用相关功能可在Sentinel控制台调整相应功能。默认所有流控设置都是保存在内存中,重启服务就没有了。6、添加监控图标引入审计start,sentinel会自动根据spring-boot-starter-actuator监控。<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.6.1</version> </dependency>开启监控图标management.endpoints.web.exposure.include=*7、自定义流控返回提示信息import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler; import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSON; import com.yanxizhu.common.utils.R; import org.springframework.context.annotation.Configuration; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @description: Sentinel流控信息提示自定i * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/13 20:14 * @version: 1.0 */ @Configuration public class MySentinelConfig implements BlockExceptionHandler { @Override public void handle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws Exception { R r = R.error(10000, "自定义提示"); httpServletResponse.setContentType("application/json;charset=utf-8"); httpServletResponse.getWriter().write(JSON.toJSONString(r)); } }限流规则可参考官网限流文档。每一个微服务都有一个自己的自定义流控返回提示信息,其它配置一样,只是提示信息不同。8、熔断、降级针对Feign远程调用熔断引入Feign远程调用依赖<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>打开 Sentinel 对 Feign 的支持:feign.sentinel.enabled=true熔断实现@FeignClient(value = "renren-fast",fallback = SeckillFeignServiceFallBack.class) public interface CardFeignService { @GetMapping("/sys/user/list") public R list(@RequestParam Map<String, Object> params); }熔断回调@S1f4j @Component public class SeckillFeignServiceFallBack implements SeckillFeignService{ @Override public R getskuseckillInfo(Long skuld) { Log.info("熔断方法调用...getskuSecki11Info"); return R.error(com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MWANY_REQUEST.getCode(); com.yanxizhu.family.common.exception.BizCodeEnume.TOO_MANY_REQUEST.getAsg()); } }调用方:手动指定远程服务的降级策略。远程服务被降级处理。触发我们的熔断回调方法。提供方:超大浏览的时侯,必须牺牲一些远程服务。在服务的提供方(远程服务)指定降级策略;提供方是在运行。但是不运行自己的业务逻辑,返回的是默认的降级数据(限流的数据)。@SentinelResource 注解用来标识资源是否被限流、降级。上述例子上该注解的属性 sayHello 表示资源名。@SentinelResource 还提供了其它额外的属性如 blockHandler,blockHandlerClass,fallback 用于表示限流或降级的操作(注意有方法签名要求),更多内容可以参考 Sentinel 注解支持文档。若不配置 blockHandler、fallback 等函数,则被流控降级时方法会直接抛出对应的 BlockException;若方法未定义 throws BlockException 则会被 JVM 包装一层 UndeclaredThrowableException。注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。9、自定义受保护的资源1)、代码try(Entry entry =SphU.entry("seckillSkus"))(/业务逻辑catch(Execption e)(}2)、基于注解。eSentinelResource(value ="getCurrentSeckiLLSkusResource",blockHandler ="blockHandler")无论是1,2方式一定要配置被限流以后的默认返回注:一般推荐将 @SentinelResource 注解加到服务实现上,而在 Web 层直接使用 Spring Cloud Alibaba 自带的 Web 埋点适配。Sentinel Web 适配同样支持配置自定义流控处理逻辑,参考 相关文档。10、网关限流引入网关sentinel依赖<dependency> <groupId>com.alibaba.csp</groupId> <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId> <version>x.y.z</version> </dependency>使用时只需注入对应的 SentinelGatewayFilter 实例以及 SentinelGatewayBlockExceptionHandler 实例即可(若使用了 Spring Cloud Alibaba Sentinel,则只需按照文档进行配置即可,无需自己加 Configuration)。更多网关限流可参考官方文档11、定制网关流控返回参考官方文档
2022年03月13日
40 阅读
0 评论
4 点赞
2022-03-13
SpringBoot整合RabbitMQ
SpringBoot整合RabbitMQ1、引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.6.3</version> </dependency>2、配置rabbitmq通过RabbitProperties.properties配置文件配置链接 rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 publisher-confirm-type: correlated publisher-returns: true template: mandatory: true重点:#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个Returnconfirm template: mandatory: true3、开启Rabbit启动类添加@EnableRabbit注解@EnableRabbit @SpringBootApplication public class FamilyBookingApplication { public static void main(String[] args) { SpringApplication.run(FamilyBookingApplication.class, args); } }4、Rabbit使用现在RabbitAutoConfiguration就生效了,就有了组件RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate5、使用测试1、创建Exchange、Queue、Binding使用AmqpAdmin创建。通过创建各交换机构造方法,找到对应接口的实现,传入参数即可创建Exchange。public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) @Autowired AmqpAdmin amqpAdmin; /** * 创建Exchange交换机 */ @Test public void craeteChage() { /** * public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) */ DirectExchange directExchange = new DirectExchange("Family-Hello-Exchange",true,false); amqpAdmin.declareExchange(directExchange); } /** * 创建Queue交换机 */ @Test public void craetQueue() { /** * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) */ Queue queue = new Queue("Family-Hello-Queue", true,false, false); amqpAdmin.declareQueue(queue); } /** * 创建Bind交换机 */ @Test public void craetBind() { /** * public Binding(String destination, 绑定目标 * Binding.DestinationType destinationType, 绑定类型 * String exchange, 交换机 * String routingKey, router-key * @Nullable Map<String, Object> arguments 参数) { */ Binding binding = new Binding("Family-Hello-Queue", Binding.DestinationType.QUEUE, "Family-Hello-Exchange", "Java.hello", null); amqpAdmin.declareBinding(binding); }上面为DirectExchange类型交换机的创建,其它类型Exchange类似。更多API可参考AmqpAdmin的各方法。2、收发消息发送消息 /** * rabbitTemplate 发送消息 */ @Test public String sendMessage(){ //发送字符串 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }3、接受消息RabbitMQ为我们提供了监听@RabbitListener监听注解,使用时,必须开启@EnableRabbit功能。不用监听消息队列时,可以不用开启@EnableRabbit注解。 /** * 监听队列是各数组,可以同时监听多个队列 * 通过返回的消息类型知道是一个org.springframework.amqp.core.Message; * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public String sendMessage(){ //发送字符串 // rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello","Hello RabbitMQ 。。。",new CorrelationData(UUID.randomUUID().toString())); //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setAddress("rabbitMQ测试-会员地址信息"); memberEntity.setEmail("rabbitMQ测试-会员邮箱,batis@foxmial.com"); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("Family-Hello-Exchange","Java.hello",memberEntity,new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; } /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitListener(queues={"Family-Hello-Queue"}) public void receiveMessageAndObject(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息2:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息2:"+message+"消息内容:"+memberEntity); }Queue:可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息注意:1)、同一个消息,只能有一个客户端收到2)、只有一个消息完全处理完,方法运行结束,我们才可以接收到下一个消息3)、@RabbitHandler也可以接受队列消息@RabbitListener可以用在类和方法上@RabbitHandler只能用在方法上。@RabbitHandler应用场景:当发送的数据为不同类型时,@RabbitListener接口消息时,参数类型不知道定义那个。比如:想队列发送的对象有user、People people,那下面接受时,对象不知道写那个public void receiveMessageObject(Message message, 这里该写那个对象, Channel channel)因此@RabbitHandler就可以用上了。通过在类上加上@RabbitHandler注解监听队列消息,在方法上加@RabbitHandler注解,参数就可以指定接受那个数据类型的消息了。简单代码如下:@RabbitListener(queues={"Family-Hello-Queue"}) @Service("memberService") public class MemberServiceImpl extends ServiceImpl<MemberDao, MemberEntity> implements MemberService { //接受队列中指定的People类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } //接受队列中指定的User类型数据 @RabbitHandler public void receiveMessageObj(Message message, MemberEntity memberEntity, Channel channel){ //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); System.out.println("监听到队列消息:"+message+"消息内容:"+memberEntity); } }6、RabbitMQ Java配置Json序列化通过RabbitAutoConfiguration.java中注入的RabbitTemplate看到configure。@Bean @ConditionalOnMissingBean public RabbitTemplateConfigurer rabbitTemplateConfigurer(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { RabbitTemplateConfigurer configurer = new RabbitTemplateConfigurer(); configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique()); configurer.setRetryTemplateCustomizers((List)retryTemplateCustomizers.orderedStream().collect(Collectors.toList())); configurer.setRabbitProperties(properties); return configurer; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean({RabbitOperations.class}) public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(); configurer.configure(template, connectionFactory); return template; }如果配置中有MessageConverter消息转换器就用configurer.setMessageConverter((MessageConverter)messageConverter.getIfUnique());如果没有就用RabbitTemplate默认配置的。RabbitTemplateConfigurer.java默认配置源码中可以看到默认使用的转换器如下:public MessageConverter getMessageConverter() { return this.messageConverter; } private MessageConverter messageConverter = new SimpleMessageConverter(); 然后通过SimpleMessageConverter可以看到默认创建消息时,使用的消息转换器类型 protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; if (object instanceof byte[]) { bytes = (byte[])((byte[])object); messageProperties.setContentType("application/octet-stream"); } else if (object instanceof String) { try { bytes = ((String)object).getBytes(this.defaultCharset); } catch (UnsupportedEncodingException var6) { throw new MessageConversionException("failed to convert to Message content", var6); } messageProperties.setContentType("text/plain"); messageProperties.setContentEncoding(this.defaultCharset); } else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object"); } if (bytes != null) { messageProperties.setContentLength((long)bytes.length); return new Message(bytes, messageProperties); } else { throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); } }如果是序列化的就是使用的SerializationUtils.serialize(object);else if (object instanceof Serializable) { try { bytes = SerializationUtils.serialize(object); } catch (IllegalArgumentException var5) { throw new MessageConversionException("failed to convert to serialized Message content", var5); } messageProperties.setContentType("application/x-java-serialized-object");因此我们可以自定义MessageConverter。通过MessageConverter接口实现类型,可以看到json各实现类。因此RabbitMQ自定义配置如下:import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: RabbitMQ配置 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/3/12 17:07 * @version: 1.0 */ @Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }再次发送对象时,从mq里面获取的对象就是json格式了。7、开启消息回调确认机制#开启发送端确认机制,发送到Broke的确认 publisher-confirm-type: correlated #开启时消息发送到队列的消息确认机制 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个ReturnCallback template: mandatory: true #手动确认ack listener: simple: acknowledge-mode: manual8、重点:发送端:package com.yanxizhu.family.users.config; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * @description: RabbitMQ配置 序列化 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 16:44 * @version: 1.0 */ @Configuration public class RabbitConfig { private RabbitTemplate rabbitTemplate; /** * rabbitTemplate循环依赖,解决方法:手动初始化模板方法 * @param connectionFactory * @return */ @Primary @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setMessageConverter(messageConverter()); initRabbitTemplate(); return rabbitTemplate; } @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } /** * 定制rabbitMaTemplate * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。 * 4、手动ack如何签收 * //消息头信息 * MessageProperties messageProperties = message.getMessageProperties(); * long deliveryTag = messageProperties.getDeliveryTag(); * channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收 * 5、消费退回 * * //参数:long var1, boolean var3(是否批量), boolean var4 * channel.basicNack(deliveryTag,true,true); //可以批量回退 * * //参数:long var1, boolean var3 * channel.basicReject(deliveryTag,true); //单个回退 * * 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。 */ public void initRabbitTemplate(){ //设置确认回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要消息抵达服务器 * @param correlationData 当前消息关联的唯一数据(这个消息的唯一id) * @param ack 消息是否成功收到 * @param cause 失败原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //correlationData:注意,这个correlationData就是发送消息时,设置的correlationData。 System.out.println("====确认收到消息了。。。。================"); System.out.println(correlationData+"|"+ack+"|"+cause); } }); //注意:必须要有这一步,不然不生效 rabbitTemplate.setMandatory(true); //设置消息抵达队列回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 当时这个消息发给哪个交换机 * @param routingKey 当时这个消息用的哪个路由键 */ public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("==========投递到队列失败了。。。=========="); System.out.println(message); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); } }); } } 发送消息时,可以将new CorrelationData(UUID.randomUUID().toString()),传的uuid参数保存到数据库,到时候直接遍历数据库即可知道那些投递成功到队列,那些投递失败。说明: * 一、发送端消息确认机制,发送给Broke的消息确认回调 * 1、开始消息确认 publisher-confirm-type: correlated * 2、设置确认回调 setConfirmCallback() * 二、发送端消息确认机制,发送给队列的消息确认回调 * 1、开始消息确认 * publisher-returns: true * template: * mandatory: true * 2、设置确认回调 setReturnsCallback * 三、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息) * 1、开启消费端消息确认 * listener: * simple: * acknowledge-mode: manual * 2、消费端默认是自动消息确认的,只要消息手法哦,客户端会自动确认,服务器就会移除这个消息。 * 默认的消息确认(默认ACK)问题: * 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,服务器宕机了。会发生消息丢失; * 3、因此,需要使用消费者手动确认默认(手动ack),只要我们没有明确高数MQ,消息被消费,就没有ack,消息就一致是unacked状态。 * 即使服务器宕机,消息也不会丢失,会重新变为ready,下一次有新的消费者链接进来就发给它处理。消费端手动ack /** * * 通过传入第二个参数spring可以帮我们把之间转的对象获取后转成对象,第二个参数不固定的泛型<T> * 比如上面发送的是一个user对象,那我们第二个参数就是User user,接受到的数据就直接是user了。 * 还可以直接获取到链接中传输数据的通道channel * @param message */ @RabbitHandler public void receiveMessageObject(Message message, MemberEntity memberEntity, Channel channel) throws IOException { //消息体,数据内容 //通过原生Message message获取的消息还得进行转化才好使用,需要转成对象。 byte[] body = message.getBody(); //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); //手动确认签收 channel.basicAck(deliveryTag,true); System.out.println("监听到队列消息3:"+message+"消息类型:"+message.getClass()); System.out.println("监听到队列消息3:"+message+"消息内容:"+memberEntity); //回退 //可以批量回退 // channel.basicNack(deliveryTag,true,true); //单个回退 //true重新排队,false丢弃 // channel.basicReject(deliveryTag,true); }说明:手动ack如何签收 //消息头信息 MessageProperties messageProperties = message.getMessageProperties(); long deliveryTag = messageProperties.getDeliveryTag(); channel.basicAck(deliveryTag,true); //手动确认签收,deliveryTag默认自增的,true是否批量确认签收消费退回 //参数:long var1, boolean var3(是否批量), boolean var4 channel.basicNack(deliveryTag,true,true); //可以批量回退 //参数:long var1, boolean var3 channel.basicReject(deliveryTag,true); //单个回退 第三个参数或第二个参数:false 丢弃 true:发回服务器,服务器重新入对。最后rabbitMQ配置:spring: rabbitmq: host: 192.168.56.10 virtual-host: / port: 5672 #开启发送端确认 publisher-confirm-type: correlated #开启发送端消息抵达队列的确认 publisher-returns: true #只要抵达队列,以异步发送优先回调我们这个retureconfirm template: mandatory: true #开启手动确认 listener: simple: acknowledge-mode: manualSpringBoot中使用延时队列1、Queue、Exchange、Binding可以@Bean进去 2、监听消息的方法可以有三种参数(不分数量,顺序) Object content, Message message, Channel channel 3、channel可以用来拒绝消息,否则自动ack;代码实现发送消息到MQ: public String sendDelayMessage(){ //发送对象 MemberEntity memberEntity = new MemberEntity(); memberEntity.setId(11111111L); memberEntity.setAddress("死信队列测试"); memberEntity.setEmail("延迟队列,异步处理会员信息"); memberEntity.setCreateTime(new Date()); //发送对象,注意:发送的对象必须序列化。默认发送的对象是序列化的,如果想以json格式方式发送,需要进行配置。 rabbitTemplate.convertAndSend("member-event-exchange","users.create.member",memberEntity, new CorrelationData(UUID.randomUUID().toString())); return "向RabbitMQ发送消息成功"; }通过延迟队列,将延迟的会员信息,发送到另一个队列,通过监听过期延迟的队列,接收延迟的会员信息。package com.yanxizhu.family.users.config; import com.rabbitmq.client.Channel; import com.yanxizhu.family.users.entity.MemberEntity; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @description: 延迟队列 * 说明: 1、首先通过创建的路由键(users.create.member),将消息从交换机(member-event-exchange)发送到死信队列(member.delay.queue)。 * 2、根据死信队列(member.delay.queue)的配置,过期后,消息将通过路由键为(users.release.member)转发到交换机(member-event-exchange)。 * 3、最后交换机(member-event-exchange)再通过绑定的最终队列(users.release.member.queue)和路由键(users.release.member)转发到最终队列(member-event-exchange)。 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/17 20:47 * @version: 1.0 */ @Configuration public class DelayQueueConfig { /** * 监听最终的队列 * @param entity * @param channel * @param message * @throws IOException */ @RabbitListener(queues="member.release.queue") public void listener(MemberEntity entity, Channel channel, Message message) throws IOException { System.out.println("收到过期的会员信息:准备关闭会员"+entity.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } //@bean自动创建交换机、路由、绑定 //交换机 @Bean public Exchange orderEventExchange(){ return new TopicExchange("member-event-exchange", true, false); } /** * 容器中的Binding、Queue、Exchange都会自动创建(RabbitMQ没有的情况) * @return */ //死信队列 @Bean public Queue memberDelayQueue(){ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", "member-event-exchange");//交换机 arguments.put("x-dead-letter-routing-key", "users.release.member");//路由键 arguments.put("x-message-ttl", 60000);//过期时间 //public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments) Queue queue = new Queue("member.delay.queue",true,false,false,arguments); return queue; } //将死信队列和交换机绑定,路由键-users.create.member @Bean public Binding orderCreateOrderBingding(){ //目的地、绑定类型、交换机、路由键、参数 //public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments) return new Binding("member.delay.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.create.member", null); } //最后过期的队列 @Bean public Queue orderReleaseQueue(){ Queue queue = new Queue("member.release.queue",true,false,false); return queue; } //将最后的队列和交换机绑定,路由键-user.release.member @Bean public Binding orderReleaseOrderBingding(){ return new Binding("member.release.queue", Binding.DestinationType.QUEUE, "member-event-exchange", "users.release.member", null); } }
2022年03月13日
70 阅读
0 评论
9 点赞
1
2