Full-Stack-Notes/notes/《实战Java高并发程序设计》读书笔记.md
2019-06-10 16:28:43 +08:00

80 KiB
Raw Blame History

《实战Java高并发程序设计》读书笔记

目录

第二章 Java并行程序基础
    2.1 线程的基本操作
        2.1.1 线程中断
        2.1.2 等待wait和通知notify
        2.1.3 等待线程结束join和谦让yield
    2.2 volatile与Java内存模型JMM
    2.3 线程组
    2.4 守护线程Daemon
    2.5 线程安全的概念和synchronized
第三章 JDK 并发包
    3.1 同步控制
        3.1.1 可重入锁(ReentrantLock)
            1.锁申请等待超时
            2.公平锁
        3.1.2 条件condition
        3.1.3 信号量Semaphore
        3.1.4 读写锁ReadWriteLock
        3.1.5 倒计时CountDown
        3.1.6 循环栅栏CyclicBarrier
        3.1.7 线程阻塞工具类LockSupport
    3.2 线程池
        3.2.1 JDK对线程池的支持
            1.计划任务
        3.2.2 线程池内部实现
            1.JDK内置的拒绝策略
            2.线程池的扩展
            3.合理优化线程池的数量
        3.2.3 Fork/Join 框架
    3.3 并发容器
        3.3.1 并发集合简介java.util.concurrent
第四章 锁的优化及注意事项
    4.2 Java虚拟机对锁优化锁做的努力
        1. 锁偏向
        2. 轻量级锁
        3. 自旋锁
        4. 锁消除
    4.3 人手一只笔ThreadLocal
    4.4 无锁
第五章 并行模式与算法
    5.5 JDK中的Future
第六章 JAVA 8 并发
    6.5 增强的Future(CompletableFuture)
    6.6 StampedLock
    6.7 LongAdder 和 LongAccumulator

正文

第二章 Java并行程序基础

2.1 线程的基本操作

2.1.1 线程中断

  • Thread.interrupt() 是一个实例方法,它通知目标线程被中断,也就是设置中断标志位;
  • Thread.isInterrupted()也是实例方法,判断当前线程是否有被中断(通过检查中断标志位);
  • Thread.interrupted()是静态方法:用来判断当前线程的中断状态,但同时会清除当前线程的中断标志位状态。
// Thread 类
public void interrupt()         	// 设置中断标志位
public boolean isInterrupted()    	// 判断是否被中断
public static boolean interrupted() // 判断是否被中断 并清除标志位  
//只是设置标志位,并不能停止线程
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
                while (true) {
                    System.out.println("子线程打印");
              }
        });
        thread.start();
        Thread.sleep(10);
        thread.interrupt();
    }
}

// 能停止线程
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    System.out.println("子线程打印");
              }
        });
        thread.start();
        Thread.sleep(10);
        thread.interrupt();
    }
}

// 不能停止线程
public class Test {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            while (!Thread.interrupted()||!Thread.currentThread().isInterrupted()) {
                    System.out.println("子线程打印");
            }
        });
        thread.start();
        Thread.sleep(10);
        thread.interrupt();
    }
}

2.1.2 等待wait和通知notify

这两个方法在Object 下,意味着任何对象都可以调用这两个方法。

如果是一个线程调用了wait(),那么它就会进入object对象的等待队列。notify() 随机唤醒(并非先等待先唤醒,不是公平的唤醒) 等待队列中的一个队列,notifyAll() 唤醒全部等待。

// 正常情况
public class Test {
    private static final Object object = new Object();
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (object) {
                System.out.println("线程1");
            }
        }).start();
        new Thread(() -> {
            synchronized (object) {
                System.out.println("线程2");
            }
        }).start();
    }
}
//输出:
	 线程1
	 线程2
	
	
// 锁住对象不释放
public class Test {
    private static final Object object = new Object();
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (object) {
                while (true) {
                    try {
                        Thread.sleep(1000);
                        System.out.println("线程1");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        new Thread(() -> {
            synchronized (object) {
                System.out.println("线程2");
            }
        }).start();
    }
}

//输出:
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1
    线程1

// 等待与唤醒
public class Test {
    private static final Object object = new Object();
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (object) {
                try {
                    System.out.println("对象object等待");
                    object.wait();
                    System.out.println("线程1后续操作");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            synchronized (object) {
                System.out.println("对象object唤醒");
                object.notify();
                System.out.println("线程2后续操作");
            }
        }).start();
    }
}

// 输出
对象object等待
对象object唤醒
线程2后续操作
线程1后续操作


// 全部唤醒
public class Test {
    private static final Object object = new Object();
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (object) {
                try {
                    System.out.println("对象object在线程1等待");
                    object.wait();
                    System.out.println("线程1后续操作");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            synchronized (object) {
                try {
                    System.out.println("对象object在线程2等待");
                    object.wait();
                    System.out.println("线程2后续操作");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            synchronized (object) {
                System.out.println("对象object唤醒");
                object.notifyAll();    
                //注: 如果是object.notify则是随机唤醒任意一个等待
                System.out.println("线程3后续操作");
            }
        }).start();
    }
}

// 输出
对象object在线程1等待
对象object在线程2等待
对象object唤醒
线程3后续操作
线程2后续操作
线程1后续操作

1wait() 与 notify/notifyAll 方法必须在同步代码块中使用

wait() 与 notify/notifyAll() 是Object类的方法在执行两个方法时要先获得锁。因此wait() 与 notify/notifyAll() 经常与synchronized搭配使用即在synchronized修饰的同步代码块或方法里面调用wait() 与 notify/notifyAll()方法。

2wait() 与 notify/notifyAll() 的执行过程

由于 wait() 与 notify/notifyAll() 是放在同步代码块中的,因此线程在执行它们时,肯定是进入了临界区中的,即该线程肯定是获得了锁的。

当线程执行wait()时会把当前的锁释放然后让出CPU进入等待状态。

当执行notify/notifyAll方法时会唤醒一个处于等待该 对象锁 的线程然后继续往下执行直到执行完退出对象锁锁住的区域synchronized修饰的代码块后再释放锁。

从这里可以看出notify/notifyAll()执行后并不立即释放锁而是要等到执行完临界区中代码后再释放。故在实际编程中我们应该尽量在线程调用notify/notifyAll()后立即退出临界区。即不要在notify/notifyAll()后面再写一些耗时的代码。

public class Test {

    //通过wait() 和 notify() 实现一个阻塞的队列
    static class BlockArray<E> {

        private final Object object;

        BlockArray(){
            this.object=new Object();
        }

        private ArrayList<E> arrayList = new ArrayList<>();

        E take() throws InterruptedException {
            if (arrayList.size()>0 && arrayList.get(0)!= null){
                return pop();
            }else {
                // 如果不使用同步代码块 则会报 illegalmonitorstateexception 异常
                synchronized (object){
                    object.wait();
                    }
                return pop();
                }
        }

        private E pop() {
            E e = arrayList.get(0);
            arrayList.remove(0);
            return e;
        }

        void put(E e) {
            arrayList.add(arrayList.size(), e);
            synchronized (object){
                object.notify();
            }
        }

    }

    public static void main(String[] args) {
        BlockArray<Double> blockArray = new BlockArray<>();
        new Thread(() -> {
            while (true) {
                Double take;
                try {
                    take = blockArray.take();
                    System.out.println("读取线程:获得数据:"+take);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
        pool.scheduleAtFixedRate(() -> {
            double random = Math.random();
            System.out.println("写入线程:写入数据:"+random);
            blockArray.put(random);
        },0,3,TimeUnit.SECONDS);
    }
}

结果
写入线程写入数据0.19934473742252068
读取线程获得数据0.19934473742252068
写入线程写入数据0.06838214167831369
读取线程获得数据0.06838214167831369
写入线程写入数据0.5146915352784022
读取线程获得数据0.5146915352784022

2.1.3 等待线程结束join和谦让yield

public static native void yield();

yield() 会使得当前线程让出CPU,但是在让出后还会进行CPU资源的争夺这意味这有可能会再次获得CPU的执行权。

join()方法的主要作用就是同步,它可以使得线程之间的并行执行变为串行执行。

public class Test {
    private static int j=0;
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                j++;
            }
        });
        thread.start();
        System.out.println(j);
    }
}
//不等待线程结束,结果 0


public class Test {
    private static int j=0;
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                j++;
            }
        });
        thread.start();
        thread.join();
        System.out.println(j);
    }
}
//等待线程结束,结果 100000

2.2 volatile与Java内存模型JMM

一旦一个 共享变量(类的成员变量、类的静态成员变量)被 volatile 修饰之后,那么就具备了两层语义:

1. 保证了不同线程对这个变量进行读取时的可见性,即一个线程修改了某个变量的值 ,这新值对其他线程来说是立即可见的 。(volatile 解决了线程间共享变量 的可见性问题)

  • 使用 volatile 关键字会强制将修改的值立即写入主存;
  • 使用 volatile 关键字的话,当线程 2 进行修改时,会导致线程 1 的工作内存中缓存变量 (假设变量名是 x ) 的缓存行无效(反映到硬件层的话,就是 CPU 的 L1或者 L2 缓存中对应的缓存行无效)
  • 由于线程 1 的工作内存中缓存变量 x 的缓存行无效,所以线程 1再次读取变量 x 的值时 会去主存读取。那么,在线程 2 修改 x 值时(当然这里包括 2 个操作,修改线程 2 工作内存中的值,然后将修改后的值写入内存),会使得线程 1 的工作内存中缓存变量 x 的缓存行无效,然后线程 1 读取时,发现自己的缓存行无效,它会等待缓存行对应的主存地址被更新之后,然后去对应的主存读取最新的值。那么线程 1 读取到的就是最新的正确的值。

2.禁止进行指令重排序 ,阻止编译器对代码的优化 。

volatile 关键字禁止指令重排序有两层意思:

  • 当程序执行到 volatile 变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且 结果已经对后面的操作可见;在其后面的操作肯定还没有进行。

  • 在进行指令优化时,不能把 volatile 变量前面的语句放在其后面执行,也不能把 volatile 变量后面的语句放到其前面执行。

    为了实现 volatile 的内存语义,加入 volatile 关键字时,编译器在生成字节码时,会在指令序列中插入内存屏障,会多出一个 lock 前缀指令。内存屏障是一组处理器指令,解决禁止指令重排序和内存可见性的问题。编译器和 CPU 可以在保证输出结果一样的情况下对指令重排序,使性能得到优化。处理器在进行重排序时是会考虑指令之间的数据依赖性。

    内存屏障有两个作用:

  • 先于这个内存屏障的指令必须先执行,后于这个内存屏障的指令必须后执行

  • 使得内存可见性。所以, 如果你的字段是 volatile ,在读指令前插入读屏障,可以让高速缓存中的数据失效,重新从主内存加载数据。在写指令之后插入写屏障,能让写入缓存的最新数据写回到主内存。

使用场景synchronized关键字是防止多个线程同时执行一段代码那么就会很影响程序执行效率而volatile关键字在某些情况下性能要优于synchronized但是要注意volatile关键字是无法替代synchronized关键字的因为volatile关键字无法保证操作的原子性。通常来说使用volatile必须具备以下2个条件

  •  对变量的写操作不依赖于当前值
  •  该变量没有包含在具有其他变量的不变式中

有两种典型的使用场景,一是用于修饰多个线程之间的共享标志位变量,二是用于单例模式中饿汉模式:

public class LazySingleton {

    // 必须要声明为 volatile 防止指令重排序
    private static volatile LazySingleton lazySingleton = null;

    private LazySingleton() {
        if (lazySingleton != null) {
            throw new RuntimeException("单例模式禁止反射调用!");
        }
    }

    public static LazySingleton getInstance() {
        if (lazySingleton == null) {
            synchronized (LazySingleton.class) {
                if (lazySingleton != null) {
                    /*
                     new对象过程
                      1.分配内存给这个对象
                      2.初始化对象
                      3.设置lazyDoubleCheckSingleton 指向刚分配的内存地址
                      上述三步会发生指令重排序
                     */
                    lazySingleton = new LazySingleton();
                }
            }
        }
        return lazySingleton;
    }
}

2.3 线程组

public class Test {

    static class Task implements Runnable{
        @Override
        public void run() {
            Thread current = Thread.currentThread();
            System.out.println("当前线程id: "+ current.getId()+" 当前所属线程组: "+ current.getThreadGroup().getName());
        }
    }

    public static void main(String[] args) {
        ThreadGroup group=new ThreadGroup("java线程组");
        Thread thread1 = new Thread(group, new Task());
        Thread thread2 = new Thread(group, new Task());
        thread1.start();
        thread2.start();
    }
}

//输出
当前线程id: 13 当前所属线程组: java线程组
当前线程id: 14 当前所属线程组: java线程组

2.4 守护线程Daemon

当一个Java应用中只有守护线程的时候Java虚拟机就会自动退出。

public class Test {

    static class Task implements Runnable{
        @Override
        public void run() {
            Thread current = Thread.currentThread();
            System.out.println("当前线程id: "+ current.getId()+"当前所属线程组: "+ current.getThreadGroup().getName());
        }
    }

    public static void main(String[] args) {
       ThreadGroup group=new ThreadGroup("java线程组");
        Thread thread1 = new Thread(group, new Task());
        Thread thread2 = new Thread(group, new Task());
        Thread thread3 = new Thread(group, new Task());
        thread1.setDaemon(true);
        thread2.setDaemon(true);
        thread3.setDaemon(true);
        thread1.start();  // 不会执行
        thread2.start();  // 不会执行
        thread3.start();  // 不会执行
    }
}

2.5 线程安全的概念和synchronized

1. volatile 和 synchronized 的区别

  • volatile 是变量修饰符,而 synchronized 则作用于代码块或方法。
  • volatile 不会对变量加锁不会造成线程的阻塞synchronized 会对变量加锁,可能会造成线程的阻塞。
  • volatile 仅能实现变量的修改可见性并不能保证原子性而synchronized 则 可 以 保 证 变 量 的 修 改 可 见 性 和 原 子 性 。synchronized 有两个重要含义:它确保了一次只有一个线程可以执行代码的受保护部分(互斥),而且它确保了一个线程更改的数据对于其它线程是可见的(更改的可见性),在释放锁之前会将对变量的修改刷新到主存中)。
  • volatile 标记的变量不会被编译器优化禁止指令重排序synchronized 标记的变量可以被编译器优化。
// 线程不安全
public class Test {

    private static int i=0;
    
    public static void main(String[] args) throws InterruptedException {
        Thread thread1= new Thread(new IncreaseTask());
        Thread thread2= new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }

    static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                inc();
            }
        }
        private void inc() {
            i++;
        }
    }
}

// 虽然用了synchronized线程还是不安全
public class Test {

    private static int i=0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1= new Thread(new IncreaseTask());
        Thread thread2= new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }

    static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                inc();
            }
        }
        private synchronized void inc() {   //synchronized 两个线程启动的不是同一个IncreaseTask实例
            i++;						    // inc() 是实例方法 所以两个线程锁住的是两个方法对象											
        }
    }
}
// 线程安全解决办法一 :两个线程启动同一个实例
public class Test {

    private static int i=0;

    public static void main(String[] args) throws InterruptedException {
        IncreaseTask task = new IncreaseTask();   
        Thread thread1= new Thread(task);     // 两个线程启动的是同一个实例
        Thread thread2= new Thread(task);
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }
    static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                inc();
            }
        }
        private synchronized void inc() {     // 由同一个实例对象产生的实例方法
            i++;
        }
    }
}


// 线程安全解决办法二: 将方法声明为static 类方法
public class Test {

    private static int i=0;

    public static void main(String[] args) throws InterruptedException {
        Thread thread1= new Thread(new IncreaseTask());
        Thread thread2= new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }

    static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                inc();
            }
        }
        private static synchronized void inc() {  //这是一个类方法
            i++;
        }
    }
}

// 线程安全实现方法三   利用 synchronized 锁住 同一个对象
public class Test {

    private static final String s="";

    private static int i=0;

    static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                synchronized (s){
                    i++;
                }
            }
        }
        
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1= new Thread(new IncreaseTask());
        Thread thread2= new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }
}

第三章 JDK 并发包

3.1 同步控制

3.1.1 可重入锁(ReentrantLock)

  • lock():获得锁,如果锁已经被占用,则等待;
  • lockInterruptibly(): 获得锁,但优先响应中断;
  • tryLock():尝试获得锁如果成功返回true,失败返回false。该方法不等待立即返回
  • tryLock(long time,TimeUnit unit):在给定的时间内尝试获得锁;
  • unlock():释放锁
// 线程安全的
public class Test {

    public static ReentrantLock reentrantLock=new ReentrantLock();
    private static Integer i=0;
    static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                   try {
                       reentrantLock.lock();
                       i++;
                   }catch (Exception e){
                       e.printStackTrace();
                   }finally {
                       reentrantLock.unlock();
                   }

            }
        }
        
    public static void main(String[] args) throws InterruptedException {
        Thread thread1= new Thread(new IncreaseTask());
        Thread thread2= new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }
}
    
 // 可重入性
  static class IncreaseTask implements Runnable{
        @Override
        public void run() {
            for (int j = 0; j < 100000; j++) {
                   try {
                       reentrantLock.lock();
                       reentrantLock.lock();
                       reentrantLock.lock();
                       i++;
                   }catch (Exception e){
                       e.printStackTrace();
                   }finally {
                       reentrantLock.unlock();
                       reentrantLock.unlock();
                       reentrantLock.unlock();
                   }

            }
        }

    }  
1.锁申请等待超时

使用 reentrantLock.tryLock 方法,超过指定时间则不等待。

public class Test {

    public static ReentrantLock reentrantLock=new ReentrantLock();

    private static Integer i=0;

    static class IncreaseTask implements Runnable {
        @Override
        public void run() {

            try {
               if ( reentrantLock.tryLock(5, TimeUnit.SECONDS)){
                   Thread.sleep(6000);
               }else {
                   System.out.println("获得锁失败");
               }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                reentrantLock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1= new Thread(new IncreaseTask());
        Thread thread2= new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
        //等待结束后 才打印返回值
        thread1.join();
        thread2.join();
        //并打印返回值
        System.out.println(i);
    }
}
2.公平锁
new ReentrantLock(true); //创建锁的时候指定参数

public class Test {

    public static ReentrantLock fairLock = new ReentrantLock(true);


    static class IncreaseTask implements Runnable {
        @Override
        public void run() {
            while (true) {
                fairLock.lock();
                System.out.println(Thread.currentThread().getName() + "获得锁");
                fairLock.unlock();
            }
        }
    }

    public static void main(String[] args) {
        Thread thread1 = new Thread(new IncreaseTask());
        Thread thread2 = new Thread(new IncreaseTask());
        thread1.start();
        thread2.start();
    }
}

公平锁需要维护一个有序队列需要额外的开销所以需要考虑场景使用。使用synchronized 实现的锁默认是不公平的

3.1.2 条件condition

  • await() 方法会使当前线程等待,同时释放当前锁当其他线程中使用singnal()或者singnalAll()方法时,线程会重新获得锁并继续执行。或者当线程被中断时候也能跳出等待。这和Object.wait() 方法很相似。
  • awaitUninterruptibly() 方法与await()方法基本相同,但是它并不会在等待过程中响应中断。
  • singal() 方法用于唤醒一个在等待中的线程。相对的singalAll()方法会唤醒所有在等待中的线程。
public class Test {

    public static ReentrantLock lock = new ReentrantLock();
    public static Condition condition = lock.newCondition();

    static class IncreaseTask implements Runnable {
        @Override
        public void run() {
            try {
                lock.lock();
                condition.await();
                System.out.println(Thread.currentThread().getName() + "获得锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(new IncreaseTask());
        thread1.start();
        Thread.sleep(2000);
        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

流程在调用await()方法前线程必须获得重入锁调用await()方法后线程会释放当前占用的锁。同理在调用signal()方法时当前线程也必须获得相应重入锁调用signal()方法后系统会从condition.await()等待队列中唤醒一个线程。当线程被唤醒后,它就会尝试重新获得与之绑定的重入锁,一旦获取成功将继续执行。所以调用signal()方法后一定要释放当前占用的锁,这样被唤醒的线程才能有获得锁的机会,才能继续执行。

3.1.3 信号量Semaphore

广义上说信号量是对锁的扩展无论是内部锁synchronized 还是 重入锁 ReentrantLock,一次都只允许一个线程访问一个资源,而信号量却允许指定多个线程,同时访问某一个资源。

  • acquire()方法尝试获得一个准入的许可,若无法获得,则线程会等待,直到有线程释放一个许可或者当前线程被中断;
  • acquireUninterruptibly()方法和acquire()类似,但是不响应中断;
  • tryAcquire() 尝试获得一个许可如果成功返回true,失败则返回false,它不会进行等待,立即返回;
  • release() 用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
public class Test {

    public static Semaphore semaphore=new Semaphore(5);

    static class IncreaseTask implements Runnable {
        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getId()+"获得锁!");
                Thread.sleep(2000);
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        IncreaseTask task = new IncreaseTask();
        for (int i = 0; i < 20; i++) {
            new Thread(task).start();
        }
    }
}

// 五个一组,五个一组的输出
13获得锁!
14获得锁!
15获得锁!
16获得锁!
17获得锁!
    
18获得锁!
19获得锁!
20获得锁!
21获得锁!
22获得锁!
    
23获得锁!
24获得锁!
25获得锁!
26获得锁!
27获得锁!
    
28获得锁!
29获得锁!
30获得锁!
31获得锁!
32获得锁!

3.1.4 读写锁ReadWriteLock

读写锁访问约束情况

非阻塞 阻塞
阻塞 阻塞
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
public class Test {

    // 可重入锁
    public static ReentrantLock reentrantLock = new ReentrantLock();
    // 读写锁
    public static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    // 读锁
    public static ReentrantReadWriteLock.ReadLock readLock = readWriteLock.readLock();
    // 写锁
    public static ReentrantReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();

    //待写入或者带赋值的变量
    private static String i = "";

    //写方法
    static class Write implements Runnable {

        private Lock lock;
        private String value;

        Write(Lock lock, String value) {
            this.lock = lock;
            this.value = value;
        }

        @Override
        public void run() {
            try {
                lock.lock();
                Thread.sleep(1000);
                i = value;
                System.out.println(Thread.currentThread().getName() + "写入值" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }

    //读方法
    static class Read implements Runnable {

        private Lock lock;

        Read(Lock lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            try {
                lock.lock();
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "读取到值" + i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }


    public static void main(String[] args) throws InterruptedException {

        // 耗时 2 秒 读锁是并行的
        for (int j = 0; j < 2; j++) {
            Thread thread = new Thread(new Write(writeLock, String.valueOf(j)));
            thread.start();
            thread.join();
        }
        for (int j = 0; j < 18; j++) {
            Thread thread = new Thread(new Read(readLock));
            thread.start();
        }


        // 耗时20秒
        for (int j = 0; j < 2; j++) {
            Thread thread = new Thread(new Write(reentrantLock, String.valueOf(j)));
            thread.start();
            thread.join();
        }
        for (int j = 0; j < 18; j++) {
            Thread thread = new Thread(new Read(reentrantLock));
            thread.start();
        }
    }
}

3.1.5 倒计时CountDown

public class Test {

    private static int number=1000000;
    private static CountDownLatch latch=new CountDownLatch(number);
    private static AtomicInteger integer=new AtomicInteger(0);

    static class IncreaseTask implements Runnable {
        @Override
        public void run() {
            integer.incrementAndGet();
            latch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        IncreaseTask task = new IncreaseTask();
        ExecutorService executorService = Executors.newFixedThreadPool(100);
        for (int i = 0; i <number ; i++) {
            executorService.submit(task);
        }
        latch.await();
        System.out.println("integer"+integer);
        executorService.shutdown();
    }
}

3.1.6 循环栅栏CyclicBarrier

/* 
 * @param   需要等待的线程数
 * @param   指定数量的线程到达后执行的操作
 */
public CyclicBarrier(int parties, Runnable barrierAction)
// 每次有五个人完成 则算一个小组任务完成
public class Test {

    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
        @Override
        public void run() {
            System.out.println("五人小组任务执行完成");
        }
    });

    static class Task implements Runnable {

        @Override
        public void run() {
            try {
                long l = new Double(Math.random() * 5000).longValue();
                Thread.sleep(l);
                System.out.println("任务"+Thread.currentThread().getId()+"执行完成");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args){
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        for (int j = 0; j < 10; j++) {
            executorService.submit(new Task());
        }
        executorService.shutdown();
    }
}

// 输出
任务21执行完成
任务15执行完成
任务22执行完成
任务14执行完成
任务20执行完成
五人小组任务执行完成

任务16执行完成
任务19执行完成
任务17执行完成
任务18执行完成
任务13执行完成
五人小组任务执行完成

3.1.7 线程阻塞工具类LockSupport

LockSupport的静态方法park() 可以阻塞当前线程,类似的还有parkNanos() parkUntil()等方法。他们实现了一个限时的等待。

LockSupport类使用类似信号量的机制它为每一个线程准备一个许可如果许可可用那么park()函数就会立即返回并且消费这个许可也就是将许可变为不可用如果许可不可用就会阻塞。而unpark()则使得一个许可变为可用(但是和信号量不同的是,许可不能累加,你不可能拥有超过一个许可,它永远只有一个)。

public class Test {

    static class Task implements Runnable {

        @Override
        public void run() {
            long id = Thread.currentThread().getId();
            System.out.println("线程"+ id +"开始阻塞");
            LockSupport.park();
            System.out.println("线程"+ id +"解除阻塞");
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread01 = new Thread(new Task());
        Thread thread02 = new Thread(new Task());
        thread01.start();
        thread02.start();
        Thread.sleep(3000);
        System.out.println("主线程干预");
        LockSupport.unpark(thread01);
        LockSupport.unpark(thread02);
    }
}

// 输出
线程13开始阻塞
线程14开始阻塞
主线程干预
线程13解除阻塞
线程14解除阻塞

3.2 线程池


3.2.1 JDK对线程池的支持

newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲的线程,则立即执行。若没有,则新的任务会被暂时存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

newSingleThreadExecutor()方法 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,带线程空闲,按照先入先出的顺序执行队列中的任务。

newCachedThreadPool()方法:根据实际情况动态调整线程数量。

newSingleThreadScheduledExecutor()方法该方法返回一个ScheduledExecutorService对象线程池大小为1。SeheduledExectorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能如在某个固定的延时之后执行或者周期性执行某个任务。

newScheduledThreadPool()方法该方法也返回一个ScheduledExecutorService对象但该线程池可以指定线程数量。

// 线程池举例
public class Test {

    static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "正在执行");
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.submit(new Task());
        }
    }

}
1.计划任务
// 在给定的时间,对任务进行一次调度
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

// 以上一个任务开始执行时间为起点之后的period时间调度下一次任务如果任务耗时大于period则上一次任务结束后立即执行下一次任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,
                                                  TimeUnit unit);
// 以上一个任务开始执行时间为起点再经过delay时间调度下一次任务不论任务耗时如何上一次任务结束后都需要等待delay时间之后才可以执行下一次任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,
                                                  TimeUnit unit);



public class Test {

    public static long cacheTime = System.currentTimeMillis();

    static class Task implements Runnable {

        private String type;

        Task(String type) {
            this.type = type;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                long nowTime = System.currentTimeMillis();
                long period = (nowTime - cacheTime);
                System.out.println(type + Thread.currentThread().getId() + "执行耗时" + period + "毫秒");
                cacheTime = nowTime;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
        // 只执行一次
        ScheduledFuture<?> schedule = pool.schedule(new Task("schedule"), 2, TimeUnit.SECONDS);
        // 指定2秒为固定周期执行如果项目执行耗时5秒则项目结束后立马执行下一次任务所以输出的时间间隔为5秒
        pool.scheduleAtFixedRate(new Task("FixedRate"), 0, 2, TimeUnit.SECONDS);
        // 总是在上一次项目结束后间隔指定周期执行所以项目耗时5秒还需要间隔2秒执行所以输出的时间间隔为7秒
        pool.scheduleWithFixedDelay(new Task("WithFixedDelay"),0,2,TimeUnit.SECONDS);
    }

}

3.2.2 线程池内部实现

public ThreadPoolExecutor(int corePoolSize,                          //核心线程数量
                              int maximumPoolSize,                   //最大线程数量   
                              long keepAliveTime,                    //超过核心线程数量的线程的存活时间
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,     //任务队列
                              ThreadFactory threadFactory,           //线程工厂
                              RejectedExecutionHandler handler)      //拒绝策略
1.JDK内置的拒绝策略

AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作;

CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用线程中运行当前被丢弃的任务。这种情况下,提交任务的线程性能可能会急剧下降。

DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

DiscardPolicy策略:默认丢弃无法处理的任务,不予任何处理。

public class Test {

    private  static int i=0;
    private static CountDownLatch latch=new CountDownLatch(1000);
    static class Task implements Runnable {

        @Override
        public void run() {
            increase();
        }

        private void increase(){
            synchronized (this){
                i++;
            }
            System.out.println(Thread.currentThread().getName()+"输出:"+i);
            latch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        // 自定义线程
        ExecutorService executorService = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                r -> {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    System.out.println("create" + thread.getName());
                    return thread;
                });

        Task task = new Task();
        for (int i = 0; i < 1000; i++) {
            executorService.submit(task);
        }
        latch.await();
        System.out.println("最后的结果是"+i);
        executorService.shutdown();
    }
}
2.线程池的扩展

ThreadPoolExecutor是一个可以拓展的线程池它提供了beforeExecute()、afterExecute()和terminated() 三个接口对线程池进行扩展。

public class Test {

    private static int i = 0;

    private static CountDownLatch latch = new CountDownLatch(1000);

    static class Task implements Runnable {

        @Override
        public void run() {
            increase();
        }

        private void increase() {
            synchronized (this) {
                i++;
            }
            System.out.println(Thread.currentThread().getName() + "输出:" + i);
            latch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        // 自定义线程
        ExecutorService executorService = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>()) {

            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("线程" + t.getName() + "准备执行");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("线程" + r + "执行结束");
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        Task task = new Task();
        for (int i = 0; i < 1000; i++) {
            executorService.submit(task);
        }
        latch.await();
        System.out.println("最后的结果是" + i);
        executorService.shutdown();
    }
}
3.合理优化线程池的数量

3.2.3 Fork/Join 框架

ForkJoin主要提供了两个主要的执行任务的接口。RecurisiveAction与RecurisiveTask

  • RecurisiveAction :没有返回值的接口;
  • RecurisiveTask :带有返回值的接口。

ForkJoinPool提供如下两个常用的构造器

  • ForkJoinPool(int parallelism) 创建一个包含parallelism个并行线程的ForkJoinPool;

  • ForkJoinPool() 以Runtime.availableProcessors()方法的返回值作为parallelism参数来创建ForkJoinPool。

public class CountTask extends RecursiveTask<Long> {

    private Long start;

    private Long end;

    private static long hold=50L;

    CountTask(Long start,Long end){
        this.start=start;
        this.end=end;
    }

    @Override
    protected Long compute() {
        long sum=0L;
        // 一定要保证能够进进入if中的终止条件如果无限制的拆分,可能会导致栈溢出
        if (end-start<=hold){
            // 假设一个最小计算单元都是耗时的
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for (long i = start; i < end; i++) {
                sum+=i;
            }
        }else {
            List<CountTask> countTasks=new ArrayList<>();
            long l = (end - start) / hold;
            for (int i = 0; i <l; i++) {
                CountTask task = new CountTask(start + i * hold, start + (i + 1) * hold);
                countTasks.add(task);
                task.fork();
                if (i==l-1){
                    CountTask countTask = new CountTask(start + (i + 1) * hold, end);
                    countTasks.add(countTask);
                    countTask.fork();
                }
            }
            for (CountTask countTask:countTasks){
                sum+=countTask.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool=new ForkJoinPool(100);
        CountTask task = new CountTask(0L, 10000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        Long aLong = result.get();
        System.out.println("结果为"+aLong);
    }
}

3.3 并发容器

3.3.1 并发集合简介java.util.concurrent

  • ConcurrentHashMap:线程安全的HashMap;
  • CopyOnWriteArrayList:在读多写少的场合这个List的性能非常好远远好于Vector。CopyOnWriteArrayList读取是完全不用加锁的写入也不会阻塞读取操作只有写入与写入之间需要加锁。
  • ConcurrentLinkedQueue:高效的并发队列使用链表实现。可以看作一个线程安全的LinkList算是在高并发环境下性能最好的队列;
  • BlockingQuene:这是一个接口JDK内部通过链表LinkedBlockingQueue数组ArrayBlockingQueue等方式实现了这个接口。表示阻塞队列非常适合用于作为数据共享的通道
  • ConcurrentSkipListMap:跳表的实现。这是一个Map,使用调表的数据结构进行快速查找。

第四章 锁的优化及注意事项

4.2 Java虚拟机对锁优化锁做的努力

作为一款公用平台JDK 本身也为并发程序的性能绞尽脑汁,在 JDK 内部也想尽一切办法提供并发时的系统吞吐量。

1. 锁偏向

锁偏向是一种针对加锁操作的优化手段。

如果一个线程获得了锁,那么锁就进入偏向模式。当这个线程再次请求锁时,无须再做任何同步操作。这样就节省了大量有关锁申请的操作,从而提高了程序性能。

因此对于几乎没有锁竞争的场合偏向锁有比较好的优化效果因为连续多次极有可能是同一个线程请求相同的锁。而对于锁竞争比较激烈的场合其效果不佳。因为在竞争激烈的场合最有可能的情况是每次都是不同的线程来请求相同的锁。使用Java虚拟机参数-XX:UseBiasedLocking 可以开启偏向锁。

2. 轻量级锁

如果偏向锁失败,即上一个请求的锁的线程和这个线程不是同一个。偏向锁失败意味者不能避免做同步操作。此时,虚拟机并不会立即挂起线程。他会使用一种成为轻量级锁的优化手段。

轻量级锁的操作也很方便,它只是简单地将对象头部作为指针,指向持有锁的线程堆栈的内部,来判断一个线程是否持有对象锁。 如果线程获得轻量级锁成功,则可以顺利进入临界区。如果轻量级锁失败,则表示其他线程抢先争夺了锁,那么当前线程的锁请求就会膨胀为重量级锁。

3. 自旋锁

锁膨胀后,虚拟机为了避免线程真实地在操作系统层面挂起,虚拟机还会在做最后的努力–自选锁。由于当前线程暂时无法获得锁但是什么时候可以获得锁是一个未知数。也许在CPU几个时钟周期后就可以得到锁。如果这样简单粗暴的挂起线程可能是一种得不偿失的操作因此系统会进行一次赌注它会假设在不久的将来线程可以得到这把锁。

因此虚拟机让当前线程做个空循环,在经过若干次循环后,如果可以得到锁,那么就顺利进入临界区。如果还不能得到锁,才会真实地将线程在操作系统层面挂起。

4. 锁消除

锁消除是一种更彻底的锁优化。Java虚拟机在JIT编译时通过对运行上下文的扫描去除不可能存在共享资源竞争的锁。通过锁消除可以节省毫无意义的请求锁时间

下面这种这种情况我们使用vector 而vector内部使用了synchronize请求锁。

public String []  createStrings(){
    Vector<String>  v= new Vector<String>();
    for(int i=0;i<100;i++){
        v.add(Integer.toString(i));
    }
    return v.toArray(new String[]{});
}

由于V只在函数 createStrnigs 中使用因此它只是一个单纯的局部变量。局部变量是在线程栈上分配的属于线程私有额数据因此不可能被其他线程访问。所以在这种情况下Vector内部所有加锁同步都是没有必要的。如果虚拟机检测到这种情况就会将这些无用的锁操作去除。

锁消除涉及的一项关键技术为逃逸分析。所谓逃逸分析就是观察某一个变量是否会逃出某一个作用域。在本例中变量v显然没有逃出createString 函数之外。以此为基础虚拟机才可以大胆的将v内部的加锁操作去除。如果createStrings 返回的不是String数组而是v本身那么就认为变量v逃逸出了当前函数也就是说v有可能被其他线程访问。如是这样虚拟机就不能消除v中的锁操作。

逃逸分析必须在 -server 模式下进行,可以使用 -XX:DoEscapeAnalysis 参数打开逃逸分析,使用 -XX:+EliminateLocks 参数可以打开锁消除。

4.3 人手一只笔ThreadLocal

// SimpleDateFormat.parse() 不是线程安全的方法
public class Test {
    public static final SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    static class Task implements Runnable{
        private int i;
        Task(int i){
            this.i=i;
        }

        @Override
        public void run() {
            try {
                Date parse = sdf.parse("2018-08-08 08:08:" + i / 60);
                System.out.println(i+":"+parse);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task(i));
        }
    }
}

使用threadLocal优化

public class Test {

    private static ThreadLocal<SimpleDateFormat> threadLocal=new ThreadLocal<>();

    static class Task implements Runnable{
        private int i;
        Task(int i){
            this.i=i;
        }

        @Override
        public void run() {
            try {
                if (threadLocal.get()==null){
                    threadLocal.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
                }
                Date parse = threadLocal.get().parse("2018-08-08 08:08:" + i / 60);
                System.out.println(i+":"+parse);
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task(i));
        }
        executorService.shutdown();
    }
}

4.4 无锁

CAS的全称是Compare And Swap 即比较交换,其算法核心思想如下

执行函数CAS(V,E,N)

其包含3个参数

  • V表示要更新的变量

  • E表示预期值

  • N表示新值

如果V值等于E值则将V的值设为N。若V值和E值不同则说明已经有其他线程做了更新则当前线程什么都不做。通俗的理解就是CAS操作需要我们提供一个期望值当期望值与当前线程的变量值相同时说明还没线程修改该值当前线程可以进行修改也就是执行CAS操作但如果期望值与当前线程不符则说明该值已被其他线程修改此时不执行更新操作但可以选择重新读取该变量再尝试再次修改该变量也可以放弃操作。


// 原子类
public class Test {

    private static int i=0;
    private static AtomicInteger j=new AtomicInteger(0);
    // AtomicReference 对普通对象的封装
    private static AtomicReference<Integer> k=new AtomicReference<>(0);

    static class Task implements Runnable{

        private CountDownLatch latch;

        Task(CountDownLatch latch){
            this.latch=latch;
        }

        @Override
        public void run() {
            i++;
            j.incrementAndGet();
            k.getAndUpdate(x->x+1);
            latch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int number = 10000;
        CountDownLatch latch=new CountDownLatch(number);
        Semaphore semaphore=new Semaphore(10);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Task task = new Task(latch);
        for (int i = 0; i < number; i++) {
            semaphore.acquire();
            executorService.execute(task);
            semaphore.release();
        }
        latch.await();
        System.out.println("输出i的值"+i);
        System.out.println("输出j的值"+j.get());
        System.out.println("输出K的值"+k.get());
        executorService.shutdown();
    }
}

// 输出
输出i的值9983
输出j的值10000
输出K的值10000

1. 无锁数组

public class Test {

    private static int number = 100000;

    private static int capacity = 10;

    // 保证对集合内元素的操作具有原子性
    private static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(capacity);

    // 对集合本生的操作线程安全 对集合内元素的操作线程不安全
    private static LinkedBlockingQueue<Integer> LinkedBlockingQueue = new LinkedBlockingQueue<>();

    //对集合本生的操作线程安全  对集合内元素的操作线程不安全
    private static Vector<Integer> vector = new Vector<>(10);

    //普通集合
    private static ArrayList<Integer> arrayList = new ArrayList<>(capacity);
    private static ArrayList<Integer> list = new ArrayList<>();

    static {
        for (int i = 0; i < capacity; i++) {
            arrayList.add(i, 0);
            vector.add(i, 0);
        }
    }

    static class Task implements Runnable {

        private CountDownLatch latch;
        private int number;

        Task(CountDownLatch latch, int number) {
            this.latch = latch;
            this.number = number;
        }

        @Override
        public void run() {
            LinkedBlockingQueue.add(1);
            list.add(1);
            int num = number % capacity;
            arrayList.set(num, arrayList.get(num) + 1);
            vector.set(num, arrayList.get(num) + 1);
            atomicIntegerArray.getAndIncrement(num);
            latch.countDown();

        }
    }

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(number);
        Semaphore semaphore = new Semaphore(10);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < number; i++) {
            semaphore.acquire();
            executorService.execute(new Task(latch, i));
            semaphore.release();
        }
        latch.await();
        System.out.println("arrayList:" + arrayList);
        System.out.println("atomicIntegerArray:" + atomicIntegerArray);
        System.out.println("vector:" + vector);
        System.out.println("list:" + list.size());
        System.out.println("LinkedBlockingQueue:" + LinkedBlockingQueue.size());
        executorService.shutdown();
    }
}

// arrayList:[9999, 9998, 9985, 8542, 9998, 10000, 9996, 9999, 9995, 9998]
// atomicIntegerArray:[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
// vector:[10000, 9999, 9986, 8543, 9999, 10001, 9997, 10000, 9996, 9999]
// list:99847
// LinkedBlockingQueue:100000

2. AtomicIntegerFieldUpdater

AtomicIntegerFieldUpdater让普通变量也能具备原子性的操作。

  • AtomicIntegerFieldUpdater只能修改它可见范围内的变量其是通过反射得到这个变量的如果不可见就会出错
  • 为保证变量被正确的读取它必须是volatile类型的
  • 由于CSA操作会通过对象实例中的偏移量直接进行赋值因此它不支持static字段。
public class Test {

    static class Task implements Runnable {

        private Candidate candidate;
        private CountDownLatch latch;
        private AtomicIntegerFieldUpdater fieldUpdater;

        Task(CountDownLatch latch, Candidate candidate,AtomicIntegerFieldUpdater fieldUpdater) {
            this.candidate = candidate;
            this.latch=latch;
            this.fieldUpdater=fieldUpdater;
        }

        @Override
        public void run() {
            fieldUpdater.incrementAndGet(candidate);
            latch.countDown();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        int number = 10000;
        CountDownLatch latch = new CountDownLatch(number);
        Semaphore semaphore = new Semaphore(10);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Candidate candidate = new Candidate("候选人", 0);
        AtomicIntegerFieldUpdater<Candidate> fieldUpdater=AtomicIntegerFieldUpdater.newUpdater(Candidate.class,"score");
        for (int i = 0; i < number; i++) {
            semaphore.acquire();
            executorService.execute(new Task(latch, candidate,fieldUpdater));
            semaphore.release();
        }
        latch.await();
        System.out.println(candidate.getName() + "获得票数:" + candidate.getScore());
        executorService.shutdown();
    }


    private static class Candidate {

        private String name;

        // 1. 不能声明为 private  2. 必须用 volatile 关键字修饰
        public volatile int score;

        Candidate(String name, int score) {
            this.name = name;
            this.score = score;
        }
        public int getScore() {
            return score;
        }
        public void setScore(int score) {
            this.score = score;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    }
}

// 输出
候选人获得票数:10000

3. SynchronousQueue队列 实现线程之间通讯

SynchronousQueue内部没有容量但是由于一个插入操作总是对应一个移除操作反过来同样需要满足。那么一个元素就不会再SynchronousQueue 里面长时间停留,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程。也就是说这更像是一种信道(管道),资源从一个方向快速传递到另一方向。显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入着(生产者)传递给移除着(消费者),这在多任务队列中是最快处理任务的方式。

public class Test {


    public static SynchronousQueue<Double> queue = new SynchronousQueue<>();

    static class ReadThread implements Runnable {

        @Override
        public void run() {
            System.out.println("读线程启动");
            while (true){
                try {
                    Double peek = queue.take();
                    System.out.println("读线程获取值:" + peek);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    static class WriteThread implements Runnable {

        @Override
        public void run() {
            System.out.println("写线程写入值");
            try {
                queue.put(Math.random());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) throws InterruptedException {
        new Thread(new ReadThread()).start();
        Thread.sleep(3000);
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
        pool.scheduleAtFixedRate(new WriteThread(),0,2,TimeUnit.SECONDS);
    }
}

结果
读线程启动
写线程写入值
读线程获取值:0.4200400971747895
写线程写入值
读线程获取值:0.5515246887760102
写线程写入值
读线程获取值:0.37153976270075484
写线程写入值
读线程获取值:0.5244714268587413

第五章 并行模式与算法

5.5 JDK中的Future

使用Future模式,获取数据的时候可能无法立即得到需要的数据。而是先拿到一个包装,可以在需要的时候再去get获取需要的数据。

public class Test {

    static class Task implements Callable<Double> {

        @Override
        public Double call() throws Exception {
            Thread.sleep(3000);
            return Math.random();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executors = Executors.newFixedThreadPool(100);
        List<Future<Double>> futureList = new ArrayList<>();
        Task task = new Task();
        for (int i = 0; i < 100; i++) {
            Future<Double> submit = executors.submit(task);
            futureList.add(submit);
        }
        // 模拟主线程执行另外一个耗时的计算
        Thread.sleep(3000);
        for (int i = 0; i < futureList.size(); i++) {
            try {
                Double aDouble = futureList.get(i).get();
                System.out.println("获得任务" + i + "结果" + aDouble);
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        executors.shutdown();
    }
}

第六章 JAVA 8 并发

public static void main(String[] args) {
        String[] strings={"1","2","3","4","5","6","7","8","9","10"};
        Consumer consumer = System.out::println;
        Consumer consumerErr = System.err::println;
        Arrays.stream(strings).forEach(consumer.andThen(consumerErr));
    }

6.5 增强的Future(CompletableFuture)

  1. 当一个Future可能需要显示地完成时使用CompletionStage接口去支持完成时触发的函数和操作。
public class Test {

    static class Compute implements Runnable {

        private CompletableFuture<Integer> future;

        Compute(CompletableFuture<Integer> future){
            this.future=future;
        }

        @Override
        public void run() {
            try {
                System.out.println("子线程等待主线程运算完成····");
                Integer integer = future.get();
                System.out.println("子线程完成后续运算:"+integer*integer);
            } catch (InterruptedException|ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CompletableFuture<Integer> future=new CompletableFuture<>();
        System.out.println("主线程开始计算");
        new Thread(new Compute(future)).start();
        int i=0;
        for (int j = 0; j <100; j++) {
            i=i+j;
        }
        Thread.sleep(2000);
        System.out.println("主线程计算完成");
        future.complete(i);
    }
}


主线程开始计算
子线程等待主线程运算完成····
主线程计算完成
子线程完成后续运算:24502500
  1. CompletableFuture中4个异步执行任务静态方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}  

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

其中supplyAsync用于有返回值的任务runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池否则默认ForkJoinPool.commonPool()系统级公共线程池,这些线程都是Daemon线程这意味着如果主线程退出这些线程无论是否完成都会退出

public class Test {

    private static Integer compute() {
        int i=0;
        for (int j = 0; j <100; j++) {
            i=i+j;
        }
        System.out.println("运算方法所在的线程:"+Thread.currentThread().getId());
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getId()+"线程开始计算");
        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(Test::compute);
        Integer integer = supplyAsync.get();
        System.out.println(Thread.currentThread().getId()+"线程计算完成:"+integer*integer);

    }
    
1线程开始计算
运算方法所在的线程11
1线程计算完成:24502500
  1. 流式调用
public class Test {

    private static Integer compute() {
        System.out.println("compute所在线程"+Thread.currentThread().getId());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 100;
    }

    private static Integer epr(Integer integer) {
        try {
            System.out.println("epr所在线程"+Thread.currentThread().getId());
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer*integer;
    }

    private static void accept(Integer integer){
        System.out.println("accept所在线程"+Thread.currentThread().getId());
        System.out.println("accept方法消费掉计算结果:"+integer);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(Test::compute)
                .thenApply(Test::epr)
                .thenAccept(Test::accept)   //值在这一步被消费掉了
                .thenAccept(x-> System.out.println("运算结果:"+x));
        future.get(); //如果缺少这一步不会有任何输出。原因supplyAsync 默认使用的线程池中的线程全部是守护线程
    }
}

compute所在线程11
epr所在线程11
accept所在线程11
accept方法消费掉计算结果:10000
运算结果:null
  1. 异常捕获 exceptionally
public class Test {

    private static Integer compute() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int i = 100 / 0;
        return 100;
    }

    private static Integer dealException(Throwable e){
        e.printStackTrace();
        return 0;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(Test::compute)
                .exceptionally(Test::dealException)
                .thenAccept(System.out::println);
        future.get();
    }
}

5.组合多个CompletableFuture ( thenCompose方法 和 thenCombineAsync方法)

public class Test {

    private static Integer compute() {
        System.out.println("compute 所在线程:"+Thread.currentThread().getId());
        return 100;
    }

    private static Integer epr(Integer integer) {
        System.out.println("epr 所在线程:"+Thread.currentThread().getId());
        return integer*integer;
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 组合实现方式1 thenCompose 一个计算的输入依赖另外一个计算的结果
        CompletableFuture<Void> future01 = CompletableFuture.supplyAsync(Test::compute)
                .thenCompose(x->CompletableFuture.supplyAsync(()->epr(x)))
                .thenAccept(x-> System.out.println("运算结果:"+x));
        future01.get();

        System.out.println();

        // 组合实现方式2 thenCombineAsync 两个计算之间不依赖
        CompletableFuture<Integer> future02 = CompletableFuture.supplyAsync(Test::compute);
        CompletableFuture<Integer> future03 = CompletableFuture.supplyAsync(()->Test.epr(100));
        CompletableFuture<Integer> futureAll = future02.thenCombineAsync(future03, (x, y) -> x + y);
        System.out.println("运算结果:"+futureAll.get());

    }
}

compute 所在线程11
epr 所在线程11
运算结果:10000

compute 所在线程11
epr 所在线程11
运算结果:10100

6.6 StampedLock

StampedLock是并发包里面jdk8版本新增的一个锁该锁提供了三种模式的读写控制三种模式分别如下

  • 写锁writeLock是个排它锁或者叫独占锁同时只有一个线程可以获取该锁当一个线程获取该锁后其它请求的线程必须等待当目前没有线程持有读锁或者写锁的时候才可以获取到该锁请求该锁成功后会返回一个stamp票据变量用来表示该锁的版本当释放该锁时候需要unlockWrite并传递参数stamp。

  • 悲观读锁readLock是个共享锁在没有线程获取独占写锁的情况下同时多个线程可以获取该锁如果已经有线程持有写锁其他线程请求获取该读锁会被阻塞。这里讲的悲观其实是参考数据库中的乐观悲观锁的这里说的悲观是说在具体操作数据前悲观的认为其他线程可能要对自己操作的数据进行修改所以需要先对数据加锁这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量用来表示该锁的版本当释放该锁时候需要unlockRead并传递参数stamp。

  • 乐观读锁tryOptimisticRead是相对于悲观锁来说的在操作数据前并没有通过CAS设置锁的状态如果当前没有线程持有写锁则简单的返回一个非0的stamp版本信息获取该stamp后在具体操作数据前还需要调用validate验证下该stamp是否已经不可用也就是看当调用tryOptimisticRead返回stamp后到到当前时间间是否有其他线程持有了写锁如果是那么validate会返回0否者就可以使用该stamp版本的锁对数据进行操作。由于tryOptimisticRead并没有使用CAS设置锁状态所以不需要显示的释放该锁。该锁的一个特点是适用于读多写少的场景因为获取读锁只是使用与或操作进行检验不涉及CAS操作所以效率会高很多但是同时由于没有使用真正的锁在保证数据一致性上需要拷贝一份要操作的变量到方法栈并且在操作数据时候可能其他写线程已经修改了数据而我们操作的是方法栈里面的数据也就是一个快照所以最多返回的不是最新的数据但是一致性还是得到保障的。

public class Point {
    private double x, y;//内部定义表示坐标点
    private final StampedLock s1 = new StampedLock();//定义了StampedLock锁

    void move(double deltaX, double deltaY) {
        long stamp = s1.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            s1.unlockWrite(stamp);//退出临界区,释放写锁
        }
    }

    double distanceFormOrigin() {//只读方法
        long stamp = s1.tryOptimisticRead();  
        double currentX = x, currentY = y;
        if (!s1.validate(stamp)) {
            stamp = s1.readLock()
            try {
                currentX = x;
                currentY = y;
            } finally {
                s1.unlockRead(stamp);//退出临界区,释放读锁
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

6.7 LongAdder 和 LongAccumulator

public class Test {

    private static int number=1000000;
    private static CountDownLatch latch=new CountDownLatch(number);
    private static int anInt= 0 ;
    private static AtomicLong atomicLong=new AtomicLong(0L);
    // 比 AtomicLong 性能更好  适用于求和的场景
    private static LongAdder longAdder=new LongAdder();
    // 对 LongAccumulator 增加 构造函数支持传入对应的运算函数
    private static LongAccumulator longAccumulator=new LongAccumulator((x,y)->x+y,0);

    static class Task implements Runnable{

        @Override
        public void run() {
            anInt++;
            atomicLong.incrementAndGet();
            longAdder.add(1);
            longAccumulator.accumulate(1);
            latch.countDown();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(1000);
        Task task = new Task();
        for (int i = 0; i < number; i++) {
            semaphore.acquire();
            executor.submit(task);
            semaphore.release();
        }
        latch.await();
        System.out.println("int:"+anInt);
        System.out.println("atomicLong:"+atomicLong.intValue());
        System.out.println("longAdder:"+longAdder.intValue());
        System.out.println("longAccumulator:"+longAccumulator.intValue());
        executor.shutdown();
    }
}


int:998719
atomicLong:1000000
longAdder:1000000
longAccumulator:1000000

参考内容: