J.U.C之AQS

admin
2022-04-25 / 0 评论 / 128 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2022年04月25日,已超过725天没有更新,若内容或图片失效,请留言反馈。

AQS

J.U.C之AQS,底层由线程队列实现。AbstractQueuedSynchronizer简称AQS,JUC的核心。

使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。
利用了一个int类型表示状态。
使用方法是继承。
子类通过继承并通过实现它的方法管理其状态{acquire和release}的方法操纵状态。
可以同时实现排它锁和共享锁模式(独占、共享)。

AQS主要同步组件

  1. CountDownLatch
  2. Semaphore
  3. CyclicBarrier
  4. ReentrantLock
  5. Condition
  6. FutureTask

依次介绍

1、CountDownLatch-闭锁

可以完成类似阻塞的功能。一个线程或多个线程等待,直到其它线程完成。使用场景,程序执行需要某个条件执行完后,才执行。

示例一

package com.yanxizhu.demo.concurrency.aqs;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description: CountDownLatch,实现线程阻塞,等待执行完再执行程序打印
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 20:24
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class CountDownLatchDemo {

    private static final int threadNum = 500;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (int i = 0; i <threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                try{
                    pringNum(num);
                }catch (Exception e){
                    log.error("执行错误【{}】", e.getMessage());
                }finally {
                    countDownLatch.countDown();
                }
            });
        }

        countDownLatch.await();
        executorService.shutdown();
        log.info("finash ..");
    }

    public static void pringNum(int num) throws InterruptedException {
        Thread.sleep(100);
        log.info("num=【{}】", num);
        Thread.sleep(100);
    }
}

输出结果:

。。。。。
20:36:34.598 [pool-1-thread-249] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【248】
20:36:34.608 [pool-1-thread-259] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【258】
20:36:34.597 [pool-1-thread-237] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【236】
20:36:34.597 [pool-1-thread-206] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【205】
20:36:34.608 [pool-1-thread-256] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【255】
20:36:34.608 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【257】
20:36:34.597 [pool-1-thread-240] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【239】
20:36:34.597 [pool-1-thread-67] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【66】
20:36:34.608 [pool-1-thread-251] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【250】
20:36:34.608 [pool-1-thread-257] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【256】
20:36:34.608 [pool-1-thread-44] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【43】
20:36:34.598 [pool-1-thread-61] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【60】
20:36:34.598 [pool-1-thread-33] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【32】
20:36:34.597 [pool-1-thread-172] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【171】
20:36:34.608 [pool-1-thread-230] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【229】
20:36:34.608 [pool-1-thread-253] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【252】
20:36:34.598 [pool-1-thread-59] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【58】
20:36:34.598 [pool-1-thread-39] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【38】
20:36:34.608 [pool-1-thread-298] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【297】
20:36:34.596 [pool-1-thread-192] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【191】
20:36:34.597 [pool-1-thread-75] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【74】
20:36:34.608 [pool-1-thread-299] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【298】
20:36:34.597 [pool-1-thread-146] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【145】
20:36:34.608 [pool-1-thread-304] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【303】
20:36:34.598 [pool-1-thread-229] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【228】
20:36:34.597 [pool-1-thread-87] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【86】
20:36:34.597 [pool-1-thread-46] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - num=【45】
20:36:34.764 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo - finash ..

另一种用法,给定时间内未执行执行await方法。到时间等当前线程执行完成后就不执行了。

示例一

package com.yanxizhu.demo.concurrency.aqs;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @description: CountDownLatch,设置等待时间
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 20:24
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class CountDownLatchDemo2 {

    private static final int threadNum = 500;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(threadNum);
        for (int i = 0; i <threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                try{
                    pringNum(num);
                }catch (Exception e){
                    log.error("执行错误【{}】", e.getMessage());
                }finally {
                    countDownLatch.countDown();
                }
            });
        }
        
        //设置了等待时间
        countDownLatch.await(100, TimeUnit.MILLISECONDS);
        executorService.shutdown();
        log.info("finash ..");
    }

    public static void pringNum(int num) throws InterruptedException {
        Thread.sleep(100);
        log.info("num=【{}】", num);
    }
}

输出结果

20:39:27.285 [pool-1-thread-258] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【257】
20:39:27.284 [pool-1-thread-180] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【179】
20:39:27.287 [pool-1-thread-54] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【53】
20:39:27.283 [pool-1-thread-368] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【367】
20:39:27.285 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【7】
20:39:27.301 [main] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - finash ..
20:39:27.286 [pool-1-thread-204] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【203】
20:39:27.301 [pool-1-thread-42] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【41】
20:39:27.305 [pool-1-thread-432] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【431】
20:39:27.285 [pool-1-thread-270] INFO com.yanxizhu.demo.concurrency.aqs.CountDownLatchDemo2 - num=【269】

100毫秒后执行打印finash,之后的线程执行完后不再执行。

2、Semaphore-信号量

用于控制线程并发的个数。使用场景,提供仅限使用的资源。比如:有大量请求时,可用信号量控制数据库连接数,防止大量请求。

示例一

package com.yanxizhu.demo.concurrency.aqs;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * @description: Semaphore-信号量,控制线程并发数
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 20:41
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class SemaphoreDemo {
    private static final int threadNum = 20;

    private static final int currentThreadNum = 3;

    public static void main(String[] args) throws InterruptedException {
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        Semaphore semaphore = new Semaphore(currentThreadNum);

        for (int i = 0; i <threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                try{
                    //获取一个许可
                    semaphore.acquire();
                    pringNum(num);
                    //释放一个许可
                    semaphore.release();
                }catch (Exception e){
                    log.error("执行错误【{}】", e.getMessage());
                }
            });
        }

        executorService.shutdown();
    }

    public static void pringNum(int num) throws InterruptedException {
        log.info("num=【{}】", num);
        Thread.sleep(1000);
    }
}

输出结果:

"C:\Program Files\jdk-11.0.10_windows-x64_bin\jdk-11.0.10\bin\java.exe" -Dvisualvm.id=83604261825100 "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\lib\idea_rt.jar=11416:C:\Program Files\JetBrains\IntelliJ IDEA 2020.3.2\bin" -Dfile.encoding=UTF-8 -classpath D:\Workspaces\ThreadConcurrency\demo-concurrency\target\classes;D:\Tools\repo\org\springframework\boot\spring-boot-starter-web\2.6.6\spring-boot-starter-web-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter\2.6.6\spring-boot-starter-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-logging\2.6.6\spring-boot-starter-logging-2.6.6.jar;D:\Tools\repo\ch\qos\logback\logback-classic\1.2.11\logback-classic-1.2.11.jar;D:\Tools\repo\ch\qos\logback\logback-core\1.2.11\logback-core-1.2.11.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-to-slf4j\2.17.2\log4j-to-slf4j-2.17.2.jar;D:\Tools\repo\org\apache\logging\log4j\log4j-api\2.17.2\log4j-api-2.17.2.jar;D:\Tools\repo\org\slf4j\jul-to-slf4j\1.7.36\jul-to-slf4j-1.7.36.jar;D:\Tools\repo\jakarta\annotation\jakarta.annotation-api\1.3.5\jakarta.annotation-api-1.3.5.jar;D:\Tools\repo\org\yaml\snakeyaml\1.29\snakeyaml-1.29.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-json\2.6.6\spring-boot-starter-json-2.6.6.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-databind\2.13.2.2\jackson-databind-2.13.2.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-annotations\2.13.2\jackson-annotations-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\core\jackson-core\2.13.2\jackson-core-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.13.2\jackson-datatype-jdk8-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.13.2\jackson-datatype-jsr310-2.13.2.jar;D:\Tools\repo\com\fasterxml\jackson\module\jackson-module-parameter-names\2.13.2\jackson-module-parameter-names-2.13.2.jar;D:\Tools\repo\org\springframework\boot\spring-boot-starter-tomcat\2.6.6\spring-boot-starter-tomcat-2.6.6.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-core\9.0.60\tomcat-embed-core-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-el\9.0.60\tomcat-embed-el-9.0.60.jar;D:\Tools\repo\org\apache\tomcat\embed\tomcat-embed-websocket\9.0.60\tomcat-embed-websocket-9.0.60.jar;D:\Tools\repo\org\springframework\spring-web\5.3.18\spring-web-5.3.18.jar;D:\Tools\repo\org\springframework\spring-beans\5.3.18\spring-beans-5.3.18.jar;D:\Tools\repo\org\springframework\spring-webmvc\5.3.18\spring-webmvc-5.3.18.jar;D:\Tools\repo\org\springframework\spring-aop\5.3.18\spring-aop-5.3.18.jar;D:\Tools\repo\org\springframework\spring-context\5.3.18\spring-context-5.3.18.jar;D:\Tools\repo\org\springframework\spring-expression\5.3.18\spring-expression-5.3.18.jar;D:\Tools\repo\org\springframework\boot\spring-boot-devtools\2.6.6\spring-boot-devtools-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot\2.6.6\spring-boot-2.6.6.jar;D:\Tools\repo\org\springframework\boot\spring-boot-autoconfigure\2.6.6\spring-boot-autoconfigure-2.6.6.jar;D:\Tools\repo\org\projectlombok\lombok\1.18.22\lombok-1.18.22.jar;D:\Tools\repo\org\slf4j\slf4j-api\1.7.36\slf4j-api-1.7.36.jar;D:\Tools\repo\org\springframework\spring-core\5.3.18\spring-core-5.3.18.jar;D:\Tools\repo\org\springframework\spring-jcl\5.3.18\spring-jcl-5.3.18.jar;D:\Tools\repo\com\google\guava\guava\31.1-jre\guava-31.1-jre.jar;D:\Tools\repo\com\google\guava\failureaccess\1.0.1\failureaccess-1.0.1.jar;D:\Tools\repo\com\google\guava\listenablefuture\9999.0-empty-to-avoid-conflict-with-guava\listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar;D:\Tools\repo\com\google\code\findbugs\jsr305\3.0.2\jsr305-3.0.2.jar;D:\Tools\repo\org\checkerframework\checker-qual\3.12.0\checker-qual-3.12.0.jar;D:\Tools\repo\com\google\errorprone\error_prone_annotations\2.11.0\error_prone_annotations-2.11.0.jar;D:\Tools\repo\com\google\j2objc\j2objc-annotations\1.3\j2objc-annotations-1.3.jar;D:\Tools\repo\joda-time\joda-time\2.9.9\joda-time-2.9.9.jar com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo
20:53:22.696 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【1】
20:53:22.696 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【2】
20:53:22.696 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【0】
20:53:23.707 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【4】
20:53:23.707 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【5】
20:53:23.707 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【6】
20:53:24.711 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【12】
20:53:24.711 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【7】
20:53:24.711 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【3】
20:53:25.726 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【8】
20:53:25.726 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【11】
20:53:25.726 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【13】
20:53:26.738 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【15】
20:53:26.738 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【10】
20:53:26.738 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【9】
20:53:27.741 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【17】
20:53:27.741 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【16】
20:53:27.741 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【14】
20:53:28.741 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【18】
20:53:28.741 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo - num=【19】

Process finished with exit code 0

通过结果可以看到,每秒只打印2个数字,就是信号量控制的。

示例二

同时获取多个许可,释放多个许可,也可以一个一个释放许可。下面代码示例,只有2个并发线程所有,获取后没有了,就类似单线程。

package com.yanxizhu.demo.concurrency.aqs;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: Semaphore-信号量,控制线程并发数,可以获取或释放多个许可
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 20:41
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class SemaphoreDemo2 {
    private static final int threadNum = 20;

    private static final int currentThreadNum = 3;

    public static void main(String[] args) throws InterruptedException {
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        Semaphore semaphore = new Semaphore(currentThreadNum);

        for (int i = 0; i <threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                try{
                    //获取3个许可,需要等3个都执行完了,才释放3个许可
                    semaphore.acquire(3);
                    pringNum(num);
                    //释放3个许可
                    semaphore.release(3);
                }catch (Exception e){
                    log.error("执行错误【{}】", e.getMessage());
                }
            });
        }

        executorService.shutdown();
    }

    public static void pringNum(int num) throws InterruptedException {
        log.info("num=【{}】", num);
        Thread.sleep(1000);
    }
}

输出结果:

20:56:17.733 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【0】
20:56:18.740 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【1】
20:56:19.754 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【5】
20:56:20.755 [pool-1-thread-7] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【6】
20:56:21.767 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【9】
20:56:22.780 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【7】
20:56:23.791 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【4】
20:56:24.800 [pool-1-thread-9] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【8】
20:56:25.801 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【3】
20:56:26.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【2】
20:56:27.824 [pool-1-thread-11] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【10】
20:56:28.834 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【11】
20:56:29.847 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【12】
20:56:30.850 [pool-1-thread-14] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【13】
20:56:31.853 [pool-1-thread-17] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【16】
20:56:32.863 [pool-1-thread-15] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【14】
20:56:33.873 [pool-1-thread-16] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【15】
20:56:34.885 [pool-1-thread-18] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【17】
20:56:35.885 [pool-1-thread-19] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【18】
20:56:36.897 [pool-1-thread-20] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo2 - num=【19】
Disconnected from the target VM, address: '127.0.0.1:13050', transport: 'socket'

示例三

当线程数超过并发数,就丢失多余的。

package com.yanxizhu.demo.concurrency.aqs;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * @description: Semaphore-信号量,控制线程并发数,超过并发量时掉漆多余线程
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 20:41
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class SemaphoreDemo3 {
    private static final int threadNum = 20;

    private static final int currentThreadNum = 3;

    public static void main(String[] args) throws InterruptedException {
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        Semaphore semaphore = new Semaphore(currentThreadNum);

        for (int i = 0; i <threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                try{
                    //尝试获取许可,获取到才执行
                    if (semaphore.tryAcquire()) {
                        pringNum(num);
                        //释放一个许可
                        semaphore.release();
                    }
                }catch (Exception e){
                    log.error("执行错误【{}】", e.getMessage());
                }
            });
        }

        executorService.shutdown();
    }

    public static void pringNum(int num) throws InterruptedException {
        log.info("num=【{}】", num);
        Thread.sleep(1000);
    }
}

输出结果:

21:04:48.376 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【0】
21:04:49.387 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo3 - num=【1】
Disconnected from the target VM, address: '127.0.0.1:2839', transport: 'socket'

只输出2个线程,其它18个线程被丢弃。

tryAcquire方法重载说明:

//无参
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}

//1个参数,获取许可个数
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

//2个参数,超时时间,单位
public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

//3个参数,获取许可个数,超时时间,单位
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

示例四

设置超时时间

package com.yanxizhu.demo.concurrency.aqs;

import com.yanxizhu.demo.concurrency.annotation.ThreadSafety;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @description: Semaphore-信号量,控制线程并发数,带超时时间限制的
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 20:41
 * @version: 1.0
 */
@Slf4j
@ThreadSafety
public class SemaphoreDemo4 {
    private static final int threadNum = 50;

    private static final int currentThreadNum = 3;

    public static void main(String[] args) throws InterruptedException {
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量
        Semaphore semaphore = new Semaphore(currentThreadNum);

        for (int i = 0; i <threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                try{
                    //尝试获取许可,获取到才执行,获取超过3秒,就不执行了。
                    if (semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS)) {
                        pringNum(num);
                        //释放一个许可
                        semaphore.release();
                    }
                }catch (Exception e){
                    log.error("执行错误【{}】", e.getMessage());
                }
            });
        }

        executorService.shutdown();
    }

    public static void pringNum(int num) throws InterruptedException {
        log.info("num=【{}】", num);
        Thread.sleep(1000);
    }
}

输出结果:

22:01:36.804 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【0】
22:01:36.804 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【2】
22:01:36.804 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【1】
22:01:37.828 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【3】
22:01:37.828 [pool-1-thread-6] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【5】
22:01:37.828 [pool-1-thread-10] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【9】
22:01:38.843 [pool-1-thread-12] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【11】
22:01:38.843 [pool-1-thread-13] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【12】
22:01:38.843 [pool-1-thread-8] INFO com.yanxizhu.demo.concurrency.aqs.SemaphoreDemo4 - num=【7】
Disconnected from the target VM, address: '127.0.0.1:11385', transport: 'socket'

3、CyclicBarrier

允许一组线程相互等待,直到达到某个共同屏障点,直到每个线程都就绪后面的才继续往下执行。和CoutDownLatch相似都是通过计数器来实现的,图从下往上看的。线程释放后可以循环使用,因此也叫循环屏障。CoutDownLatch用户分计算,汇总最后的结果。

示例一

package com.yanxizhu.demo.concurrency.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description: CyclicBarrier 实现线程相互等待
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 22:15
 * @version: 1.0
 */
@Slf4j
public class CyclicBarrierDemo {

    //定义需要相互等待5个线程,就绪后才执行,后面的代码
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

    public static void main(String[] args) throws Exception{
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            //模式看效果,等待1秒
            Thread.sleep(1000);
            executorService.execute(() -> {
                try {
                    pringWait(threadNum);
                } catch (Exception e) {
                    log.error("执行出错了。。。", e.getMessage());
                }
            });
        }
        executorService.shutdown();
    }

    public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException {
        //模式看效果,等待1秒
        Thread.sleep(1000);
        log.info("等待中。。。{}", threadNum);

        //告诉线程每个就绪了,await方法支持传入超时时间
        //可选参数long timeout, TimeUnit unit
        cyclicBarrier.await();

        //就绪后才执行
        log.info("开始执行了。。{}",threadNum);
    }
}

输出结果:

Connected to the target VM, address: '127.0.0.1:14410', transport: 'socket'
22:26:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。0
22:26:05.853 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。1
22:26:06.866 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。2
22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。3
22:26:07.881 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。3
22:26:07.882 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。1
22:26:07.882 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。2
22:26:07.881 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。0
22:26:08.896 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。4
22:26:09.899 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。5
22:26:10.901 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。6
22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。7
22:26:11.902 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。7
22:26:11.902 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。4
22:26:11.902 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。6
22:26:11.902 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 开始执行了。。5
22:26:12.903 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。8
22:26:13.916 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo - 等待中。。。9
Disconnected from the target VM, address: '127.0.0.1:14410', transport: 'socket'

示例二

带超时时间的,注意需要trycatch才能不影响后续流程执行。

package com.yanxizhu.demo.concurrency.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

/**
 * @description: CyclicBarrier 实现线程相互等待,带超时时间,注意捕获异常,才能继续执行后面的操作
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 22:15
 * @version: 1.0
 */
@Slf4j
public class CyclicBarrierDemo2 {

    //定义需要相互等待5个线程,就绪后才执行,后面的代码
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4);

    public static void main(String[] args) throws Exception{
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            //模式看效果,等待1秒
            Thread.sleep(1000);
            executorService.execute(() -> {
                try {
                    pringWait(threadNum);
                } catch (Exception e) {
                    log.error("执行出错了。。。", e.getMessage());
                }
            });
        }
        executorService.shutdown();
    }

    public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException {
        //模式看效果,等待1秒
        Thread.sleep(1000);
        log.info("等待中。。。{}", threadNum);

        //告诉线程每个就绪了,带超时时间
        try {
            cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e){
            log.warn("超时了,注意:捕获异常,才能继续执行", e.getMessage());
        }

        //就绪后才执行
        log.info("开始执行了。。{}",threadNum);
    }
}

输出结果:

22:37:57.786 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。0
22:37:58.797 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。1
22:37:59.797 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:37:59.797 [pool-1-thread-2] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:37:59.797 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。0
22:37:59.798 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。1
22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。2
22:37:59.812 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:37:59.812 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。2
22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。3
22:38:00.813 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:00.813 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。3
22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。4
22:38:01.828 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:01.828 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。4
22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。5
22:38:02.840 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:02.840 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。5
22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。6
22:38:03.842 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:03.842 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。6
22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。7
22:38:04.855 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:04.855 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。7
22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。8
22:38:05.870 [pool-1-thread-3] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:05.870 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。8
22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 等待中。。。9
22:38:06.870 [pool-1-thread-1] WARN com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 超时了,注意:捕获异常,才能继续执行
22:38:06.870 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo2 - 开始执行了。。9
Disconnected from the target VM, address: '127.0.0.1:2195', transport: 'socket'

示例三

构造可以传入runable线程,优先执行。

package com.yanxizhu.demo.concurrency.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @description: CyclicBarrier 实现线程相互等待,CyclicBarrier可以传入runable线程,优先执行该部分代码
 * @author: <a href="mailto:batis@foxmail.com">清风</a>
 * @date: 2022/4/25 22:15
 * @version: 1.0
 */
@Slf4j
public class CyclicBarrierDemo3 {

    //定义需要相互等待5个线程,就绪后才执行,后面的代码
    private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> {
        log.info("相互等待线程都就绪后,这部分代码先执行");
    });

    public static void main(String[] args) throws Exception{
        //线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            //模式看效果,等待1秒
            Thread.sleep(1000);
            executorService.execute(() -> {
                try {
                    pringWait(threadNum);
                } catch (Exception e) {
                    log.error("执行出错了。。。", e.getMessage());
                }
            });
        }
        executorService.shutdown();
    }

    public static void pringWait(int threadNum) throws InterruptedException, BrokenBarrierException {
        //模式看效果,等待1秒
        Thread.sleep(1000);
        log.info("等待中。。。{}", threadNum);

        //告诉线程每个就绪了,await方法支持传入超时时间
        //可选参数long timeout, TimeUnit unit
        cyclicBarrier.await();

        //就绪后才执行
        log.info("开始执行了。。{}",threadNum);
    }
}

输出结果:

22:42:09.313 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。0
22:42:10.313 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。1
22:42:11.318 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。2
22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。3
22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行
22:42:12.320 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。3
22:42:12.320 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。2
22:42:12.320 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。1
22:42:12.320 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。0
22:42:13.325 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。4
22:42:14.339 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。5
22:42:15.353 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。6
22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。7
22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 相互等待线程都就绪后,这部分代码先执行
22:42:16.355 [pool-1-thread-1] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。7
22:42:16.355 [pool-1-thread-5] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。4
22:42:16.355 [pool-1-thread-3] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。5
22:42:16.355 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 开始执行了。。6
22:42:17.358 [pool-1-thread-4] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。8
22:42:18.372 [pool-1-thread-2] INFO com.yanxizhu.demo.concurrency.aqs.CyclicBarrierDemo3 - 等待中。。。9

总结:注意CoutDownLatch与CyclicBarrier的不同,对别使用。

1

评论 (0)

取消