首页
关于
友链
Search
1
wlop 4K 壁纸 4k8k 动态 壁纸
1,319 阅读
2
Nacos持久化MySQL问题-解决方案
864 阅读
3
Docker搭建Typecho博客
721 阅读
4
滑动时间窗口算法
674 阅读
5
Nginx反向代理微服务配置
646 阅读
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
登录
Search
标签搜索
java
javase
docker
java8
springboot
thread
spring
分布式
mysql
锁
linux
redis
源码
typecho
centos
git
map
RabbitMQ
lambda
stream
少年
累计撰写
189
篇文章
累计收到
20
条评论
首页
栏目
生活
解决方案
JAVA基础
JVM
多线程
开源框架
数据库
前端
分布式
框架整合
中间件
容器部署
设计模式
数据结构与算法
安全
开发工具
百度网盘
天翼网盘
阿里网盘
页面
关于
友链
搜索到
3
篇与
的结果
2022-04-25
J.U.C之AQS
AQSJ.U.C之AQS,底层由线程队列实现。AbstractQueuedSynchronizer简称AQS,JUC的核心。使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。利用了一个int类型表示状态。使用方法是继承。子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操纵状态。可以同时实现排它锁和共享锁模式(独占、共享)。AQS主要同步组件CountDownLatchSemaphoreCyclicBarrierReentrantLockConditionFutureTask依次介绍1、CountDownLatch-闭锁可以完成类似阻塞的功能。一个线程或多个线程等待,直到其它线程完成。使用场景,程序执行需要某个条件执行完后,才执行。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CountDownLatch,实现线程阻塞,等待执行完再执行程序打印 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:24 * @version: 1.0 */ @Slf4j @ThreadSafety public class CountDownLatchDemo { private static final int threadNum = 500; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ pringNum(num); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(); executorService.shutdown(); log.info("finash .."); } public static void pringNum(int num) throws InterruptedException { Thread.sleep(100); log.info("num=【{}】", num); Thread.sleep(100); } }输出结果:。。。。。 20:36:34.598 [pool-1-thread-249] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【248】 20:36:34.608 [pool-1-thread-259] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【258】 20:36:34.597 [pool-1-thread-237] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【236】 20:36:34.597 [pool-1-thread-206] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【205】 20:36:34.608 [pool-1-thread-256] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【255】 20:36:34.608 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【257】 20:36:34.597 [pool-1-thread-240] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【239】 20:36:34.597 [pool-1-thread-67] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【66】 20:36:34.608 [pool-1-thread-251] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【250】 20:36:34.608 [pool-1-thread-257] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【256】 20:36:34.608 [pool-1-thread-44] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【43】 20:36:34.598 [pool-1-thread-61] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【60】 20:36:34.598 [pool-1-thread-33] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【32】 20:36:34.597 [pool-1-thread-172] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【171】 20:36:34.608 [pool-1-thread-230] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【229】 20:36:34.608 [pool-1-thread-253] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【252】 20:36:34.598 [pool-1-thread-59] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【58】 20:36:34.598 [pool-1-thread-39] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【38】 20:36:34.608 [pool-1-thread-298] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【297】 20:36:34.596 [pool-1-thread-192] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【191】 20:36:34.597 [pool-1-thread-75] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【74】 20:36:34.608 [pool-1-thread-299] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【298】 20:36:34.597 [pool-1-thread-146] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【145】 20:36:34.608 [pool-1-thread-304] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【303】 20:36:34.598 [pool-1-thread-229] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【228】 20:36:34.597 [pool-1-thread-87] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【86】 20:36:34.597 [pool-1-thread-46] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【45】 20:36:34.764 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - finash ..另一种用法,给定时间内未执行执行await方法。到时间等当前线程执行完成后就不执行了。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @description: CountDownLatch,设置等待时间 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:24 * @version: 1.0 */ @Slf4j @ThreadSafety public class CountDownLatchDemo2 { private static final int threadNum = 500; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(threadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ pringNum(num); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); }finally { countDownLatch.countDown(); } }); } //设置了等待时间 countDownLatch.await(100, TimeUnit.MILLISECONDS); executorService.shutdown(); log.info("finash .."); } public static void pringNum(int num) throws InterruptedException { Thread.sleep(100); log.info("num=【{}】", num); } }输出结果20:39:27.285 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【257】 20:39:27.284 [pool-1-thread-180] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【179】 20:39:27.287 [pool-1-thread-54] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【53】 20:39:27.283 [pool-1-thread-368] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【367】 20:39:27.285 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【7】 20:39:27.301 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - finash .. 20:39:27.286 [pool-1-thread-204] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【203】 20:39:27.301 [pool-1-thread-42] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【41】 20:39:27.305 [pool-1-thread-432] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【431】 20:39:27.285 [pool-1-thread-270] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【269】100毫秒后执行打印finash,之后的线程执行完后不再执行。2、Semaphore-信号量用于控制线程并发的个数。使用场景,提供仅限使用的资源。比如:有大量请求时,可用信号量控制数据库连接数,防止大量请求。示例一package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @description: Semaphore-信号量,控制线程并发数 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //获取一个许可 semaphore.acquire(); pringNum(num); //释放一个许可 semaphore.release(); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:"C:\Program Files\jdk-11.0.10_windows-x64_bin\jdk-11.0.10\bin\java.exe" -Dvisualvm.id=83604261825100 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\lib\idea_rt.jar=11416:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Workspaces\ThreadConcurrency\demo-concurrency\target\classes;D:\Tools\repo\org\springframework\boot\spring-boot-starter-web\2.6.6\spring-boot-starter-web-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter\2.6.6\spring-boot-starter-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-logging\2.6.6\spring-boot-starter-logging-2.6.6.jar;D:\Tools\repo\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Tools\repo\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Tools\repo\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Tools\repo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Tools\repo\org\yaml\snakeyaml\1.29\snakeyaml-1.29.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-json\2.6.6\spring-boot-starter-json-2.6.6.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-databind\2.13.2.2\jackson-databind-2.13.2.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-annotations\2.13.2\jackson-annotations-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-core\2.13.2\jackson-core-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.13.2\jackson-datatype-jdk8-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.13.2\jackson-datatype-jsr310-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.13.2\jackson-module-parameter-names-2.13.2.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-tomcat\2.6.6\spring-boot-starter-tomcat-2.6.6.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-core\9.0.60\tomcat-embed-core-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-el\9.0.60\tomcat-embed-el-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.60\tomcat-embed-websocket-9.0.60.jar;D:\Tools\repo\org\springframework\spring-web\5.3.18\spring-web-5.3.18.jar;D:\Tools\repo\org\springframework\spring-beans\5.3.18\spring-beans-5.3.18.jar;D:\Tools\repo\org\springframework\spring-webmvc\5.3.18\spring-webmvc-5.3.18.jar;D:\Tools\repo\org\springframework\spring-aop\5.3.18\spring-aop-5.3.18.jar;D:\Tools\repo\org\springframework\spring-context\5.3.18\spring-context-5.3.18.jar;D:\Tools\repo\org\springframework\spring-expression\5.3.18\spring-expression-5.3.18.jar;D:\Tools\repo\org\springframework\boot\spring-boot-devtools\2.6.6\spring-boot-devtools-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot\2.6.6\spring-boot-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-autoconfigure\2.6.6\spring-boot-autoconfigure-2.6.6.jar;D:\Tools\repo\org\projectlombok\lombok\1.18.22\lombok-1.18.22.jar;D:\Tools\repo\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Tools\repo\org\springframework\spring-core\5.3.18\spring-core-5.3.18.jar;D:\Tools\repo\org\springframework\spring-jcl\5.3.18\spring-jcl-5.3.18.jar;D:\Tools\repo\com\google\guava\guava\31.1-jre\guava-31.1-jre.jar;D:\Tools\repo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\Tools\repo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\Tools\repo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\Tools\repo\org\checkerframework\checker-qual\3.12.0\checker-qual-3.12.0.jar;D:\Tools\repo\com\google\errorprone\error_prone_annotations\2.11.0\error_prone_annotations-2.11.0.jar;D:\Tools\repo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\Tools\repo\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo 20:53:22.696 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【1】 20:53:22.696 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【2】 20:53:22.696 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【0】 20:53:23.707 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【4】 20:53:23.707 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【5】 20:53:23.707 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【6】 20:53:24.711 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【12】 20:53:24.711 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【7】 20:53:24.711 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【3】 20:53:25.726 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【8】 20:53:25.726 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【11】 20:53:25.726 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【13】 20:53:26.738 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【15】 20:53:26.738 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【10】 20:53:26.738 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【9】 20:53:27.741 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【17】 20:53:27.741 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【16】 20:53:27.741 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【14】 20:53:28.741 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【18】 20:53:28.741 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【19】 Process finished with exit code 0通过结果可以看到,每秒只打印2个数字,就是信号量控制的。示例二同时获取多个许可,释放多个许可,也可以一个一个释放许可。下面代码示例,只有2个并发线程所有,获取后没有了,就类似单线程。package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: Semaphore-信号量,控制线程并发数,可以获取或释放多个许可 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo2 { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //获取3个许可,需要等3个都执行完了,才释放3个许可 semaphore.acquire(3); pringNum(num); //释放3个许可 semaphore.release(3); }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:20:56:17.733 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【0】 20:56:18.740 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【1】 20:56:19.754 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【5】 20:56:20.755 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【6】 20:56:21.767 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【9】 20:56:22.780 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【7】 20:56:23.791 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【4】 20:56:24.800 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【8】 20:56:25.801 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【3】 20:56:26.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【2】 20:56:27.824 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【10】 20:56:28.834 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【11】 20:56:29.847 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【12】 20:56:30.850 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【13】 20:56:31.853 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【16】 20:56:32.863 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【14】 20:56:33.873 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【15】 20:56:34.885 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【17】 20:56:35.885 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【18】 20:56:36.897 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【19】 Disconnected from the target VM, address: '127.0.0.1:13050', transport: 'socket'示例三当线程数超过并发数,就丢失多余的。package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * @description: Semaphore-信号量,控制线程并发数,超过并发量时掉漆多余线程 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo3 { private static final int threadNum = 20; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //尝试获取许可,获取到才执行 if (semaphore.tryAcquire()) { pringNum(num); //释放一个许可 semaphore.release(); } }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:21:04:48.376 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【0】 21:04:49.387 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【1】 Disconnected from the target VM, address: '127.0.0.1:2839', transport: 'socket'只输出2个线程,其它18个线程被丢弃。tryAcquire方法重载说明://无参 public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //1个参数,获取许可个数 public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } //2个参数,超时时间,单位 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //3个参数,获取许可个数,超时时间,单位 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }示例四设置超时时间package com.yanxizhu.demo.concurrency.aqs; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * @description: Semaphore-信号量,控制线程并发数,带超时时间限制的 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 20:41 * @version: 1.0 */ @Slf4j @ThreadSafety public class SemaphoreDemo4 { private static final int threadNum = 50; private static final int currentThreadNum = 3; public static void main(String[] args) throws InterruptedException { //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 Semaphore semaphore = new Semaphore(currentThreadNum); for (int i = 0; i <threadNum; i++) { final int num = i; executorService.execute(() -> { try{ //尝试获取许可,获取到才执行,获取超过3秒,就不执行了。 if (semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)) { pringNum(num); //释放一个许可 semaphore.release(); } }catch (Exception e){ log.error("执行错误【{}】", e.getMessage()); } }); } executorService.shutdown(); } public static void pringNum(int num) throws InterruptedException { log.info("num=【{}】", num); Thread.sleep(1000); } }输出结果:22:01:36.804 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【0】 22:01:36.804 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【2】 22:01:36.804 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【1】 22:01:37.828 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【3】 22:01:37.828 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【5】 22:01:37.828 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【9】 22:01:38.843 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【11】 22:01:38.843 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【12】 22:01:38.843 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【7】 Disconnected from the target VM, address: '127.0.0.1:11385', transport: 'socket'3、CyclicBarrier允许一组线程相互等待,直到达到某个共同屏障点,直到每个线程都就绪后面的才继续往下执行。和CoutDownLatch相似都是通过计数器来实现的,图从下往上看的。线程释放后可以循环使用,因此也叫循环屏障。CoutDownLatch用户分计算,汇总最后的结果。示例一package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CyclicBarrier 实现线程相互等待 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,await方法支持传入超时时间 //可选参数long timeout, TimeUnit unit cyclicBarrier.await(); //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:Connected to the target VM, address: '127.0.0.1:14410', transport: 'socket' 22:26:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。0 22:26:05.853 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。1 22:26:06.866 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。2 22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。3 22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。3 22:26:07.882 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。1 22:26:07.882 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。2 22:26:07.881 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。0 22:26:08.896 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。4 22:26:09.899 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。5 22:26:10.901 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。6 22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。7 22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。7 22:26:11.902 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。4 22:26:11.902 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。6 22:26:11.902 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。5 22:26:12.903 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。8 22:26:13.916 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。9 Disconnected from the target VM, address: '127.0.0.1:14410', transport: 'socket'示例二带超时时间的,注意需要trycatch才能不影响后续流程执行。package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; /** * @description: CyclicBarrier 实现线程相互等待,带超时时间,注意捕获异常,才能继续执行后面的操作 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo2 { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,带超时时间 try { cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e){ log.warn("超时了,注意:捕获异常,才能继续执行", e.getMessage()); } //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:22:37:57.786 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。0 22:37:58.797 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。1 22:37:59.797 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.797 [pool-1-thread-2] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.797 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。0 22:37:59.798 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。1 22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。2 22:37:59.812 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。2 22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。3 22:38:00.813 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。3 22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。4 22:38:01.828 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。4 22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。5 22:38:02.840 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。5 22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。6 22:38:03.842 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。6 22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。7 22:38:04.855 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。7 22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。8 22:38:05.870 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。8 22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。9 22:38:06.870 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行 22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。9 Disconnected from the target VM, address: '127.0.0.1:2195', transport: 'socket'示例三构造可以传入runable线程,优先执行。package com.yanxizhu.demo.concurrency.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @description: CyclicBarrier 实现线程相互等待,CyclicBarrier可以传入runable线程,优先执行该部分代码 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/25 22:15 * @version: 1.0 */ @Slf4j public class CyclicBarrierDemo3 { //定义需要相互等待5个线程,就绪后才执行,后面的代码 private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> { log.info("相互等待线程都就绪后,这部分代码先执行"); }); public static void main(String[] args) throws Exception{ //线程池 ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; //模式看效果,等待1秒 Thread.sleep(1000); executorService.execute(() -> { try { pringWait(threadNum); } catch (Exception e) { log.error("执行出错了。。。", e.getMessage()); } }); } executorService.shutdown(); } public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException { //模式看效果,等待1秒 Thread.sleep(1000); log.info("等待中。。。{}", threadNum); //告诉线程每个就绪了,await方法支持传入超时时间 //可选参数long timeout, TimeUnit unit cyclicBarrier.await(); //就绪后才执行 log.info("开始执行了。。{}",threadNum); } }输出结果:22:42:09.313 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。0 22:42:10.313 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。1 22:42:11.318 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。2 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。3 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行 22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。3 22:42:12.320 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。2 22:42:12.320 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。1 22:42:12.320 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。0 22:42:13.325 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。4 22:42:14.339 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。5 22:42:15.353 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。6 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。7 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行 22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。7 22:42:16.355 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。4 22:42:16.355 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。5 22:42:16.355 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。6 22:42:17.358 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。8 22:42:18.372 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。9总结:注意CoutDownLatch与CyclicBarrier的不同,对别使用。
2022年04月25日
156 阅读
0 评论
1 点赞
2022-04-20
原子性-Atomic类常见的使用
1、(信号量)Semaphore+(闭锁)CountDownLatch+(常用源自类)AtomicIntegerpackage com.yanxizhu.demo.concurrency.atomic; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 线程安全 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/19 20:05 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoSemaphoreAndCountDownLatchAndAtomic { //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; //累加总和 private static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(clientsTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(concurrencyTotal); for (int i = 0; i < clientsTotal; i++) { executorService.execute(()->{ try { semaphore.acquire(); add(); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("累加结果为count=【{}】",count.get()); } /** * 累加 */ private static void add(){ //说明:incrementAndGet、getAndIncrement类似,i++与++i count.incrementAndGet(); count.getAndIncrement(); } }运行结果:累加结果为count=【10000】2、AtomicReferencepackage com.yanxizhu.demo.concurrency.atomic; import com.yanxizhu.demo.concurrency.annotation.ThreadSafety; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicReference; /** * @description: AtomicReference * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/20 21:09 * @version: 1.0 */ @Slf4j @ThreadSafety public class DemoAtomicReference { public static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); count.compareAndSet(0, 1); count.compareAndSet(1, 3); count.compareAndSet(2, 4); count.compareAndSet(3, 5); log.info("count结果为【{}】",count.get()); } }运行结果:count结果为【4】3、AtomicIntegerFieldUpdaterpackage com.yanxizhu.demo.concurrency.atomic; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; /** * @description: AtomicIntegerFieldUpdater,原子性修改 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/20 21:17 * @version: 1.0 */ @Slf4j public class DemoAtomicIntegerFieldUpdater { private static AtomicIntegerFieldUpdater<DemoAtomicIntegerFieldUpdater> updater = AtomicIntegerFieldUpdater.newUpdater(DemoAtomicIntegerFieldUpdater.class, "count"); @Getter public volatile int count = 100; private static DemoAtomicIntegerFieldUpdater demoAtomicIntegerFieldUpdater = new DemoAtomicIntegerFieldUpdater(); public static void main(String[] args) { if(updater.compareAndSet(demoAtomicIntegerFieldUpdater, 100, 200)) { log.info("updater succcess1,{}", demoAtomicIntegerFieldUpdater.getCount()); } if(updater.compareAndSet(demoAtomicIntegerFieldUpdater, 100, 200)) { log.info("updater succcess2,{}", demoAtomicIntegerFieldUpdater.getCount()); }else { log.info("updater fail,{}", demoAtomicIntegerFieldUpdater.getCount()); } } }运行结果:updater succcess1,200 updater fail,2004、AtomicStampedReference主要用于解决CAS中ABA问题,主要是AtomicStampedReference类中compareAndSet方法多了一个stamp的比较。stamp是每次更新来维护的。 public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } 5、LongAdderJDK1.8后新增了LongAdder。AtomicLong:是基于 CAS 方式自旋更新的;LongAdder: 是把 value 分成若干cell,并发量低的时候,直接 CAS 更新值,成功即结束。并发量高的情况,CAS更新某个cell值和需要时对cell数据扩容,成功结束;更新失败自旋 CAS 更新 cell值。取值的时候,调用 sum() 方法进行每个cell累加。AtomicLong: 包含有原子性的读、写结合的api;LongAdder :没有原子性的读、写结合的api,能保证结果最终一致性。低并发:低并发场景AtomicLong 和 LongAdder 性能相似。高并发:高并发场景 LongAdder 性能优于 AtomicLong。6、AtomicLongArray主要用于原子修改数组 // 下标, 期望值 更新值 public final boolean compareAndSet(int i, long expectedValue, long newValue) { return AA.compareAndSet(array, i, expectedValue, newValue); }通过传入下标、期望值、要更新的值进行修改。7、AtomicBoolean public final boolean compareAndSet(boolean expectedValue, boolean newValue) { return VALUE.compareAndSet(this, (expectedValue ? 1 : 0), (newValue ? 1 : 0)); }package com.yanxizhu.demo.concurrency.atomic; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; /** * @description: 结果,最后只会执行一次 * @author: <a href="mailto:batis@foxmail.com">清风</a> * @date: 2022/4/20 22:05 * @version: 1.0 */ @Slf4j public class DemoAtomicBoolean { private static AtomicBoolean isHappen = new AtomicBoolean(false); //用户数量 private static final int clientsTotal = 5000; //并发数量 private static final int concurrencyTotal = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //信号量 final Semaphore semaphore = new Semaphore(clientsTotal); //闭锁 final CountDownLatch countDownLatch = new CountDownLatch(concurrencyTotal); for (int i = 0; i < clientsTotal; i++) { executorService.execute(()->{ try { semaphore.acquire(); test(); semaphore.release(); } catch (InterruptedException e) { log.error("出现错误:【{}】", e.getMessage()); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappen结果为count=【{}】",isHappen.get()); } public static void test() { //从false变成true是一次原子操作,只会执行一次 if(isHappen.compareAndSet(false, true)) { log.info("execute .. {}", isHappen); } } }运行结果:execute .. true isHappen结果为count=【true】
2022年04月20日
162 阅读
0 评论
2 点赞
2022-03-19
CountDownLatch闭锁使用
import java.util.concurrent.CountDownLatch; /** * @date: 2022/3/19 11:09 * @version: 1.0 */ public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); myCountDownlatch myCountDownlatch = new myCountDownlatch(countDownLatch); long start = System.currentTimeMillis(); for(int i=0;i<5;i++){ new Thread(myCountDownlatch).start(); } countDownLatch.await(); long end = System.currentTimeMillis(); System.out.println("耗时:"+(end-start)); } public static class myCountDownlatch implements Runnable{ private CountDownLatch countDownLatch; public myCountDownlatch(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } @Override public void run() { synchronized (this){ try{ for(int i =0;i<500;i++){ if(i % 2==0){ System.out.println(i); } } }finally { countDownLatch.countDown(); } } } } }
2022年03月19日
176 阅读
0 评论
5 点赞